From 29985ca63d07cd8879eeba348ef7e3da798547e9 Mon Sep 17 00:00:00 2001 From: Oleksandr <1931331+olksdr@users.noreply.github.com> Date: Fri, 12 Jan 2024 13:55:06 +0100 Subject: [PATCH] ref(processor): Introduce processing groups pipelines (#2862) This change makes another step forward to typed processing pipeline: * split the envelope when possible into different processing groups * if transaction, all the item kept together in the same envelope * all the events which create event also kept together * the rest of the items are move into separate groups, depending on the target --- relay-server/src/actors/processor.rs | 363 +++++++++++++++++- .../src/actors/processor/dynamic_sampling.rs | 6 +- relay-server/src/actors/project_cache.rs | 2 + relay-server/src/actors/spooler/mod.rs | 28 +- relay-server/src/endpoints/common.rs | 43 +-- relay-server/src/envelope.rs | 56 +-- relay-server/src/testutils.rs | 4 +- relay-server/src/utils/buffer.rs | 3 + relay-server/src/utils/managed_envelope.rs | 28 +- 9 files changed, 437 insertions(+), 96 deletions(-) diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 33d5ae50cc..3b2b9c8c75 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -25,7 +25,7 @@ use relay_event_normalization::{ use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo}; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::{ - ClientReport, Event, EventType, IpAddr, Metrics, NetworkReportError, + ClientReport, Event, EventId, EventType, IpAddr, Metrics, NetworkReportError, }; use relay_filter::FilterStatKey; use relay_metrics::aggregator::AggregatorConfig; @@ -38,6 +38,7 @@ use relay_sampling::evaluation::{MatchedRuleIds, ReservoirCounters, ReservoirEva use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; +use smallvec::{smallvec, SmallVec}; use tokio::sync::Semaphore; #[cfg(feature = "processing")] @@ -78,9 +79,158 @@ mod span; #[cfg(feature = "processing")] mod unreal; +/// Creates the block only if used with `processing` feature. +/// +/// Provided code block will be executed only if the provided config has `processing_enabled` set. +macro_rules! if_processing { + ($config:expr, $if_true:block) => { + #[cfg(feature = "processing")] { + if $config.processing_enabled() $if_true + } + }; +} + /// The minimum clock drift for correction to apply. const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60); +/// Describes the groups of the processable items. +#[derive(Clone, Copy, Debug)] +pub enum ProcessingGroup { + /// All the transaction related items. + /// + /// Includes transactions, related attachments, profiles. + Transaction, + /// All the items which require (have or create) events. + /// + /// This includes: errors, NEL, security reports, user and clients reports, some of the + /// attachments. + Error, + /// Session events. + Session, + /// Attachments which can be sent alone without any event attached to it in the current + /// envelope. + StandaloneAttachment, + UserReport, + Replay, + /// Crons. + CheckIn, + Span, + /// The unknow item types will be forwarded upstream (to processing Relay), where we will + /// decide what to do with it. + ForwardUnknown, + /// All the events in the envelope we failed to group. + Ungrouped, +} + +impl ProcessingGroup { + /// Splits provided envelope into list of tuples of groups with associated envelopes. + pub fn split_envelope(mut envelope: Envelope) -> SmallVec<[(Self, Box); 3]> { + let headers = envelope.headers().clone(); + let mut grouped_envelopes = smallvec![]; + + // Each NEL item *must* have a dedicated envelope. + let nel_envelopes = envelope + .take_items_by(|item| matches!(item.ty(), &ItemType::Nel)) + .into_iter() + .map(|item| { + let headers = headers.clone(); + let items: SmallVec<[Item; 3]> = smallvec![item.clone()]; + let mut envelope = Envelope::from_parts(headers, items); + envelope.set_event_id(EventId::new()); + (ProcessingGroup::Error, envelope) + }); + grouped_envelopes.extend(nel_envelopes); + + // Extract all standalone attachments. + // Note: only if there is no items in the envelope which can create events. + if !envelope.items().any(Item::creates_event) { + let standalone_attachment_items = envelope.take_items_by(|item| { + matches!(item.ty(), &ItemType::Attachment | &ItemType::FormData) + }); + if !standalone_attachment_items.is_empty() { + grouped_envelopes.push(( + ProcessingGroup::StandaloneAttachment, + Envelope::from_parts(headers.clone(), standalone_attachment_items), + )) + } + }; + + // Extract replays. + let replay_items = envelope.take_items_by(|item| { + matches!( + item.ty(), + &ItemType::ReplayEvent | &ItemType::ReplayRecording + ) + }); + if !replay_items.is_empty() { + grouped_envelopes.push(( + ProcessingGroup::Replay, + Envelope::from_parts(headers.clone(), replay_items), + )) + } + + // Keep all the sessions together in one envelope. + let session_items = envelope + .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions)); + if !session_items.is_empty() { + grouped_envelopes.push(( + ProcessingGroup::Session, + Envelope::from_parts(headers.clone(), session_items), + )) + } + + // Extract spans. + let span_items = envelope + .take_items_by(|item| matches!(item.ty(), &ItemType::Span | &ItemType::OtelSpan)); + if !span_items.is_empty() { + grouped_envelopes.push(( + ProcessingGroup::Span, + Envelope::from_parts(headers.clone(), span_items), + )) + } + + // Extract all the items which require an event into separate envelope. + let require_event_items = envelope.take_items_by(Item::requires_event); + if !require_event_items.is_empty() { + let group = if require_event_items + .iter() + .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile)) + { + ProcessingGroup::Transaction + } else { + ProcessingGroup::Error + }; + grouped_envelopes.push(( + group, + Envelope::from_parts(headers.clone(), require_event_items), + )) + } + + // Get the rest of the envelopes, one per item. + let envelopes = envelope.items_mut().map(|item| { + let headers = headers.clone(); + let items: SmallVec<[Item; 3]> = smallvec![item.clone()]; + let envelope = Envelope::from_parts(headers, items); + let item_type = item.ty(); + let group = if matches!(item_type, &ItemType::CheckIn) { + ProcessingGroup::CheckIn + } else if matches!(item_type, &ItemType::UserReport | &ItemType::ClientReport) { + ProcessingGroup::UserReport + } else if matches!(item_type, &ItemType::Unknown(_)) { + ProcessingGroup::ForwardUnknown + } else { + // Cannot group this item type. + ProcessingGroup::Ungrouped + }; + + (group, envelope) + }); + grouped_envelopes.extend(envelopes); + + grouped_envelopes + } +} + /// An error returned when handling [`ProcessEnvelope`]. #[derive(Debug, thiserror::Error)] pub enum ProcessingError { @@ -961,15 +1111,165 @@ impl EnvelopeProcessorService { Ok(()) } - fn process_state(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { - macro_rules! if_processing { - ($if_true:block) => { - #[cfg(feature = "processing")] { - if self.inner.config.processing_enabled() $if_true - } - }; + /// Processes the general errors, and the items which require or create the events. + fn process_errors(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + // Events can also contain user reports. + report::process( + state, + &self.inner.config, + self.inner.outcome_aggregator.clone(), + ); + + if_processing!(self.inner.config, { + unreal::expand(state, &self.inner.config)?; + }); + + event::extract(state, &self.inner.config)?; + + if_processing!(self.inner.config, { + unreal::process(state)?; + attachment::create_placeholders(state); + }); + + event::finalize(state, &self.inner.config)?; + self.light_normalize_event(state)?; + event::filter(state)?; + dynamic_sampling::tag_error_with_sampling_decision(state, &self.inner.config); + + if_processing!(self.inner.config, { + event::store(state, &self.inner.config, self.inner.geoip_lookup.as_ref())?; + self.enforce_quotas(state)?; + }); + + if state.has_event() { + event::scrub(state)?; + event::serialize(state)?; } + attachment::scrub(state); + + Ok(()) + } + + /// Processes only transactions and transaction-related items. + fn process_transactions( + &self, + state: &mut ProcessEnvelopeState, + ) -> Result<(), ProcessingError> { + profile::filter(state); + event::extract(state, &self.inner.config)?; + profile::transfer_id(state); + + if_processing!(self.inner.config, { + attachment::create_placeholders(state); + }); + + event::finalize(state, &self.inner.config)?; + self.light_normalize_event(state)?; + dynamic_sampling::normalize(state); + event::filter(state)?; + dynamic_sampling::run(state, &self.inner.config); + + // We avoid extracting metrics if we are not sampling the event while in non-processing + // relays, in order to synchronize rate limits on indexed and processed transactions. + if self.inner.config.processing_enabled() || state.sampling_result.should_drop() { + self.extract_metrics(state)?; + } + + dynamic_sampling::sample_envelope(state)?; + + if_processing!(self.inner.config, { + event::store(state, &self.inner.config, self.inner.geoip_lookup.as_ref())?; + self.enforce_quotas(state)?; + profile::process(state, &self.inner.config); + }); + + if state.has_event() { + event::scrub(state)?; + event::serialize(state)?; + if_processing!(self.inner.config, { + span::extract_from_event(state); + }); + } + + attachment::scrub(state); + Ok(()) + } + + /// Processes standalone attachments. + fn process_attachments(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + if_processing!(self.inner.config, { + self.enforce_quotas(state)?; + }); + + attachment::scrub(state); + Ok(()) + } + + /// Processes user sessions. + fn process_sessions(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + session::process(state, &self.inner.config); + if_processing!(self.inner.config, { + self.enforce_quotas(state)?; + }); + Ok(()) + } + + /// Processes user and client reports. + fn process_user_reports( + &self, + state: &mut ProcessEnvelopeState, + ) -> Result<(), ProcessingError> { + if_processing!(self.inner.config, { + self.enforce_quotas(state)?; + }); + + report::process( + state, + &self.inner.config, + self.inner.outcome_aggregator.clone(), + ); + + Ok(()) + } + + /// Processes replays. + fn process_replays(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + replay::process(state, &self.inner.config)?; + if_processing!(self.inner.config, { + self.enforce_quotas(state)?; + }); + Ok(()) + } + + /// Processes cron check-ins. + fn process_checkins(&self, _state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + if_processing!(self.inner.config, { + self.enforce_quotas(_state)?; + self.process_check_ins(_state); + }); + Ok(()) + } + + /// Processes spans. + fn process_spans(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + span::filter(state); + if_processing!(self.inner.config, { + self.enforce_quotas(state)?; + span::process(state, self.inner.config.clone()); + }); + Ok(()) + } + + /// Legacy implementation of the `process_state` function, which is used for all + /// unknown [`ProcessingGroup`] variant. + /// + /// Note: this will be removed once we are confident in the new implementation and make sure + /// that all the groups properly covered. + fn process_state_legacy( + &self, + state: &mut ProcessEnvelopeState, + ) -> Result<(), ProcessingError> { session::process(state, &self.inner.config); report::process( state, @@ -985,14 +1285,14 @@ impl EnvelopeProcessorService { // This makes it possible to get in this code block while not really having an event in // the envelope. - if_processing!({ + if_processing!(self.inner.config, { unreal::expand(state, &self.inner.config)?; }); event::extract(state, &self.inner.config)?; profile::transfer_id(state); - if_processing!({ + if_processing!(self.inner.config, { unreal::process(state)?; attachment::create_placeholders(state); }); @@ -1011,12 +1311,12 @@ impl EnvelopeProcessorService { dynamic_sampling::sample_envelope(state)?; - if_processing!({ + if_processing!(self.inner.config, { event::store(state, &self.inner.config, self.inner.geoip_lookup.as_ref())?; }); } - if_processing!({ + if_processing!(self.inner.config, { self.enforce_quotas(state)?; profile::process(state, &self.inner.config); self.process_check_ins(state); @@ -1026,7 +1326,7 @@ impl EnvelopeProcessorService { if state.has_event() { event::scrub(state)?; event::serialize(state)?; - if_processing!({ + if_processing!(self.inner.config, { span::extract_from_event(state); }); } @@ -1036,6 +1336,42 @@ impl EnvelopeProcessorService { Ok(()) } + fn process_state(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + // Get the group from the managed envelope context, and if it's not set, try to guess it + // from the contents of the envelope. + let group = state.managed_envelope.group(); + + relay_log::trace!("Processing {group:?} group"); + + match group { + ProcessingGroup::Error => self.process_errors(state)?, + ProcessingGroup::Transaction => self.process_transactions(state)?, + ProcessingGroup::Session => self.process_sessions(state)?, + ProcessingGroup::StandaloneAttachment => self.process_attachments(state)?, + ProcessingGroup::UserReport => self.process_user_reports(state)?, + ProcessingGroup::Replay => self.process_replays(state)?, + ProcessingGroup::CheckIn => self.process_checkins(state)?, + ProcessingGroup::Span => self.process_spans(state)?, + // Fallback to the legacy process_state implementation for Ungrouped events. + ProcessingGroup::Ungrouped => { + relay_log::error!( + tags.project = %state.project_id, + items = ?state.envelope().items().next().map(Item::ty), + "Could not identify the processing group based on the envelope's items" + ); + + // Call the legacy implementation of the `process_state` function. + self.process_state_legacy(state)?; + } + // Leave this group unchanged. + // + // This will later be forwarded to upstream. + ProcessingGroup::ForwardUnknown => (), + } + + Ok(()) + } + fn process( &self, message: ProcessEnvelope, @@ -1112,7 +1448,6 @@ impl EnvelopeProcessorService { let result = metric!(timer(RelayTimers::EnvelopeProcessingTime), { self.process(message) }); - match result { Ok(response) => { if let Some(envelope) = response.envelope { diff --git a/relay-server/src/actors/processor/dynamic_sampling.rs b/relay-server/src/actors/processor/dynamic_sampling.rs index 2700438a6f..4d958a3eac 100644 --- a/relay-server/src/actors/processor/dynamic_sampling.rs +++ b/relay-server/src/actors/processor/dynamic_sampling.rs @@ -184,7 +184,7 @@ fn compute_sampling_decision( /// /// This execution of dynamic sampling is technically a "simulation" since we will use the result /// only for tagging errors and not for actually sampling incoming events. -fn tag_error_with_sampling_decision(state: &mut ProcessEnvelopeState, config: &Config) { +pub fn tag_error_with_sampling_decision(state: &mut ProcessEnvelopeState, config: &Config) { let (Some(dsc), Some(event)) = ( state.managed_envelope.envelope().dsc(), state.event.value_mut(), @@ -240,7 +240,7 @@ mod tests { use relay_sampling::evaluation::{ReservoirCounters, SamplingMatch}; use uuid::Uuid; - use crate::actors::processor::ProcessEnvelope; + use crate::actors::processor::{ProcessEnvelope, ProcessingGroup}; use crate::actors::project::ProjectState; use crate::envelope::{ContentType, Envelope, Item, ItemType}; use crate::extractors::RequestMeta; @@ -274,6 +274,7 @@ mod tests { } } + /// Always sets the processing item type to event. fn process_envelope_with_root_project_state( envelope: Box, sampling_project_state: Option>, @@ -428,6 +429,7 @@ mod tests { TestSemaphore::new(42).try_acquire().unwrap(), outcome_aggregator.clone(), test_store.clone(), + ProcessingGroup::Ungrouped, ), profile_id: None, event_metrics_extracted: false, diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index bdd3a461c8..d55186cb99 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -1081,6 +1081,7 @@ mod tests { use relay_test::mock_service; use uuid::Uuid; + use crate::actors::processor::ProcessingGroup; use crate::testutils::empty_envelope; use super::*; @@ -1177,6 +1178,7 @@ mod tests { empty_envelope(), services.outcome_aggregator.clone(), services.test_store.clone(), + ProcessingGroup::Ungrouped, ) .unwrap(); let message = ValidateEnvelope { envelope }; diff --git a/relay-server/src/actors/spooler/mod.rs b/relay-server/src/actors/spooler/mod.rs index 34b8d762fb..8e9edead23 100644 --- a/relay-server/src/actors/spooler/mod.rs +++ b/relay-server/src/actors/spooler/mod.rs @@ -49,6 +49,7 @@ use tokio::fs::DirBuilder; use tokio::sync::mpsc; use crate::actors::outcome::TrackOutcome; +use crate::actors::processor::ProcessingGroup; use crate::actors::project_cache::{ProjectCache, UpdateBufferIndex}; use crate::actors::test_store::TestStore; use crate::envelope::{Envelope, EnvelopeError}; @@ -386,7 +387,7 @@ impl OnDisk { &self, row: SqliteRow, services: &Services, - ) -> Result { + ) -> Result, BufferError> { let envelope_row: Vec = row.try_get("envelope").map_err(BufferError::FetchFailed)?; let envelope_bytes = bytes::Bytes::from(envelope_row); let mut envelope = Envelope::parse_bytes(envelope_bytes)?; @@ -398,12 +399,18 @@ impl OnDisk { envelope.set_start_time(start_time.into_inner()); - let managed_envelope = self.buffer_guard.enter( - envelope, - services.outcome_aggregator.clone(), - services.test_store.clone(), - )?; - Ok(managed_envelope) + ProcessingGroup::split_envelope(*envelope) + .into_iter() + .map(|(group, envelope)| { + let managed_envelope = self.buffer_guard.enter( + envelope, + services.outcome_aggregator.clone(), + services.test_store.clone(), + group, + )?; + Ok(managed_envelope) + }) + .collect() } /// Tries to delete the envelopes from the persistent buffer in batches, @@ -467,8 +474,10 @@ impl OnDisk { }; match self.extract_envelope(envelope, services) { - Ok(managed_envelope) => { - sender.send(managed_envelope).ok(); + Ok(managed_envelopes) => { + for managed_envelope in managed_envelopes { + sender.send(managed_envelope).ok(); + } } Err(err) => relay_log::error!( error = &err as &dyn Error, @@ -1139,6 +1148,7 @@ mod tests { empty_envelope(), services.outcome_aggregator, services.test_store, + ProcessingGroup::Ungrouped, ) .unwrap(); diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 30100804b9..f2854bb7ef 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -8,7 +8,7 @@ use relay_statsd::metric; use serde::Deserialize; use crate::actors::outcome::{DiscardReason, Outcome}; -use crate::actors::processor::{ProcessMetricMeta, ProcessMetrics}; +use crate::actors::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup}; use crate::actors::project_cache::{CheckEnvelope, ValidateEnvelope}; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items}; use crate::service::ServiceState; @@ -286,44 +286,20 @@ fn queue_envelope( }) } - // Take all NEL reports and split them up into the separate envelopes with 1 item per - // envelope. - for nel_envelope in envelope.split_all_by(|item| matches!(item.ty(), ItemType::Nel)) { - relay_log::trace!("queueing separate envelopes for NEL report"); - let buffer_guard = state.buffer_guard(); - let nel_envelope = buffer_guard + // Split off the envelopes by item type. + let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope()); + for (group, envelope) in envelopes { + let envelope = buffer_guard .enter( - nel_envelope, + envelope, state.outcome_aggregator().clone(), state.test_store().clone(), + group, ) .map_err(BadStoreRequest::QueueFailed)?; - state - .project_cache() - .send(ValidateEnvelope::new(nel_envelope)); - } - - // Split the envelope into event-related items and other items. This allows to fast-track: - // 1. Envelopes with only session items. They only require rate limiting. - // 2. Event envelope processing can bail out if the event is filtered or rate limited, - // since all items depend on this event. - if let Some(event_envelope) = envelope.split_by(Item::requires_event) { - relay_log::trace!("queueing separate envelope for non-event items"); - - // The envelope has been split, so we need to fork the context. - let event_context = buffer_guard.enter( - event_envelope, - state.outcome_aggregator().clone(), - state.test_store().clone(), - )?; - state - .project_cache() - .send(ValidateEnvelope::new(event_context)); + state.project_cache().send(ValidateEnvelope::new(envelope)); } - // Update the old context before continuing with the source envelope. - managed_envelope.update(); - if managed_envelope.envelope().is_empty() { // The envelope can be empty here if it contained only metrics items which were removed // above. In this case, the envelope was accepted and needs no further queueing. @@ -356,6 +332,9 @@ pub async fn handle_envelope( envelope, state.outcome_aggregator().clone(), state.test_store().clone(), + // It's not clear at this point which group this envelope belongs to. + // The decission will be made while queueing in `queue_envelope` function. + ProcessingGroup::Ungrouped, ) .map_err(BadStoreRequest::QueueFailed)?; diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 95f4706005..c89c8129e8 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -47,7 +47,7 @@ use relay_quotas::DataCategory; use relay_sampling::DynamicSamplingContext; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use smallvec::{smallvec, SmallVec}; +use smallvec::SmallVec; use crate::constants::DEFAULT_EVENT_RETENTION; use crate::extractors::{PartialMeta, RequestMeta}; @@ -945,6 +945,11 @@ pub struct Envelope { } impl Envelope { + /// Creates an envelope from the provided parts. + pub fn from_parts(headers: EnvelopeHeaders, items: Items) -> Box { + Box::new(Self { items, headers }) + } + /// Creates an envelope from request information. pub fn from_request(event_id: Option, meta: RequestMeta) -> Box { Box::new(Self { @@ -1000,6 +1005,11 @@ impl Envelope { } } + /// Returns reference to the [`EnvelopeHeaders`]. + pub fn headers(&self) -> &EnvelopeHeaders { + &self.headers + } + /// Returns the number of items in this envelope. #[allow(dead_code)] pub fn len(&self) -> usize { @@ -1045,6 +1055,11 @@ impl Envelope { self.headers.sent_at } + /// Sets the event id on the envelope. + pub fn set_event_id(&mut self, event_id: EventId) { + self.headers.event_id = Some(event_id); + } + /// Sets the timestamp at which an envelope is sent to the upstream. pub fn set_sent_at(&mut self, sent_at: DateTime) { self.headers.sent_at = Some(sent_at); @@ -1110,7 +1125,7 @@ impl Envelope { self.items.iter_mut() } - /// Returns the an option with a reference to the first item that matches + /// Returns an option with a reference to the first item that matches /// the predicate, or None if the predicate is not matched by any item. pub fn get_item_by(&self, mut pred: F) -> Option<&Item> where @@ -1119,7 +1134,7 @@ impl Envelope { self.items().find(|item| pred(item)) } - /// Returns the an option with a mutable reference to the first item that matches + /// Returns an option with a mutable reference to the first item that matches /// the predicate, or None if the predicate is not matched by any item. pub fn get_item_by_mut(&mut self, mut pred: F) -> Option<&mut Item> where @@ -1152,8 +1167,9 @@ impl Envelope { /// Splits off the items from the envelope using provided predicates. /// - /// First predicate is the the additional condition on the count of found items by second + /// First predicate is the additional condition on the count of found items by second /// predicate. + #[cfg(test)] fn split_off_items(&mut self, cond: C, mut f: F) -> Option> where C: Fn(usize) -> bool, @@ -1178,6 +1194,7 @@ impl Envelope { /// with all items that return `true`. Items that return `false` remain in this envelope. /// /// The returned envelope assumes the same headers. + #[cfg(test)] pub fn split_by(&mut self, f: F) -> Option> where F: FnMut(&Item) -> bool, @@ -1190,37 +1207,6 @@ impl Envelope { })) } - /// Splits the envelope by the given predicate. - /// - /// The main differents from `split_by()` is this function returns the list of the newly - /// constracted envelopes with all the items where the predicate returns `true`. Otherwise it - /// returns an empty list. - /// - /// The returned envelopes assume the same headers. - pub fn split_all_by(&mut self, f: F) -> SmallVec<[Box; 3]> - where - F: FnMut(&Item) -> bool, - { - let mut envelopes = smallvec![]; - let Some(split_items) = self.split_off_items(|count| count == 0, f) else { - return envelopes; - }; - - let headers = &mut self.headers; - - for item in split_items { - // Each item should get an envelope with the new event id. - headers.event_id = Some(EventId::new()); - - envelopes.push(Box::new(Envelope { - items: smallvec![item], - headers: headers.clone(), - })) - } - - envelopes - } - /// Retains only the items specified by the predicate. /// /// In other words, remove all elements where `f(&item)` returns `false`. This method operates diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index d377d0683d..5155d1e0f9 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -101,7 +101,9 @@ pub fn empty_envelope() -> Box { .parse() .unwrap(); - Envelope::from_request(Some(EventId::new()), RequestMeta::new(dsn)) + let mut envelope = Envelope::from_request(Some(EventId::new()), RequestMeta::new(dsn)); + envelope.add_item(Item::new(ItemType::Event)); + envelope } pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { diff --git a/relay-server/src/utils/buffer.rs b/relay-server/src/utils/buffer.rs index a063d37a00..b5fb54c6a3 100644 --- a/relay-server/src/utils/buffer.rs +++ b/relay-server/src/utils/buffer.rs @@ -3,6 +3,7 @@ use std::fmt; use relay_system::Addr; use crate::actors::outcome::TrackOutcome; +use crate::actors::processor::ProcessingGroup; use crate::actors::test_store::TestStore; use crate::envelope::Envelope; use crate::statsd::RelayHistograms; @@ -92,6 +93,7 @@ impl BufferGuard { envelope: Box, outcome_aggregator: Addr, test_store: Addr, + group: ProcessingGroup, ) -> Result { let permit = self.inner.try_acquire().ok_or(BufferError)?; @@ -109,6 +111,7 @@ impl BufferGuard { permit, outcome_aggregator, test_store, + group, )) } } diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index b41cc438e9..d9b1fe618a 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -8,6 +8,7 @@ use relay_quotas::{DataCategory, Scoping}; use relay_system::Addr; use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; +use crate::actors::processor::ProcessingGroup; use crate::actors::test_store::{Capture, TestStore}; use crate::envelope::{Envelope, Item}; use crate::extractors::RequestMeta; @@ -61,6 +62,7 @@ struct EnvelopeContext { slot: Option, partition_key: Option, done: bool, + group: ProcessingGroup, } /// Tracks the lifetime of an [`Envelope`] in Relay. @@ -93,6 +95,7 @@ impl ManagedEnvelope { slot: Option, outcome_aggregator: Addr, test_store: Addr, + group: ProcessingGroup, ) -> Self { let meta = &envelope.meta(); let summary = EnvelopeSummary::compute(envelope.as_ref()); @@ -105,6 +108,7 @@ impl ManagedEnvelope { slot, partition_key: None, done: false, + group, }, outcome_aggregator, test_store, @@ -117,7 +121,13 @@ impl ManagedEnvelope { outcome_aggregator: Addr, test_store: Addr, ) -> Self { - let mut envelope = Self::new_internal(envelope, None, outcome_aggregator, test_store); + let mut envelope = Self::new_internal( + envelope, + None, + outcome_aggregator, + test_store, + ProcessingGroup::Ungrouped, + ); envelope.context.done = true; envelope } @@ -134,7 +144,13 @@ impl ManagedEnvelope { outcome_aggregator: Addr, test_store: Addr, ) -> Self { - Self::new_internal(envelope, None, outcome_aggregator, test_store) + Self::new_internal( + envelope, + None, + outcome_aggregator, + test_store, + ProcessingGroup::Ungrouped, + ) } /// Computes a managed envelope from the given envelope and binds it to the processing queue. @@ -145,8 +161,9 @@ impl ManagedEnvelope { slot: SemaphorePermit, outcome_aggregator: Addr, test_store: Addr, + group: ProcessingGroup, ) -> Self { - Self::new_internal(envelope, Some(slot), outcome_aggregator, test_store) + Self::new_internal(envelope, Some(slot), outcome_aggregator, test_store, group) } /// Returns a reference to the contained [`Envelope`]. @@ -154,6 +171,11 @@ impl ManagedEnvelope { self.envelope.as_ref() } + /// Returns the [`ProcessingGroup`] where this envelope belongs to. + pub fn group(&self) -> ProcessingGroup { + self.context.group + } + /// Returns a mutable reference to the contained [`Envelope`]. pub fn envelope_mut(&mut self) -> &mut Envelope { self.envelope.as_mut()