Skip to content

Commit

Permalink
ref: Bring back handle_processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Jul 23, 2024
1 parent e967e27 commit e9434ab
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 31 deletions.
107 changes: 79 additions & 28 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,68 @@ impl ProjectCacheBroker {
project.check_envelope(context)
}

/// Handles the processing of the provided envelope.
fn handle_processing(&mut self, key: QueueKey, managed_envelope: ManagedEnvelope) {
let project_key = managed_envelope.envelope().meta().public_key();

let Some(project) = self.projects.get_mut(&project_key) else {
relay_log::error!(
tags.project_key = %project_key,
"project could not be found in the cache",
);

let mut project = Project::new(project_key, self.config.clone());
project.prefetch(self.services.project_cache.clone(), false);
self.projects.insert(project_key, project);
self.enqueue(key, managed_envelope);
return;
};

let project_cache = self.services.project_cache.clone();
let Some(project_info) = project
.get_cached_state(project_cache.clone(), false)
.enabled()
else {
// TODO: The caller should already send the valid project info with the message.
relay_log::error!(
tags.project_key = %project_key,
"project has no valid cached state",
);
return;
};

// The `Envelope` and `EnvelopeContext` will be dropped if the `Project::check_envelope()`
// function returns any error, which will also be ignored here.
// TODO(jjbayer): check_envelope also makes sure the envelope has proper scoping.
// If we don't call check_envelope in the same message handler as handle_processing,
// there is a chance that the project is not ready yet and events are published with
// `organization_id: 0`. We should eliminate this footgun by introducing a `ScopedEnvelope`
// type which guarantees that the envelope has complete scoping.
if let Ok(CheckedEnvelope {
envelope: Some(managed_envelope),
..
}) = project.check_envelope(managed_envelope)
{
let reservoir_counters = project.reservoir_counters();

let sampling_project_info = managed_envelope
.envelope()
.sampling_key()
.and_then(|key| self.projects.get_mut(&key))
.and_then(|p| p.get_cached_state(project_cache, false).enabled())
.filter(|state| state.organization_id == project_info.organization_id);

let process = ProcessEnvelope {
envelope: managed_envelope,
project_info,
sampling_project_info,
reservoir_counters,
};

self.services.envelope_processor.send(process);
}
}

/// Checks an incoming envelope and decides either process it immediately or buffer it.
///
/// Few conditions are checked here:
Expand Down Expand Up @@ -798,7 +860,6 @@ impl ProjectCacheBroker {

let project_state =
project.get_cached_state(project_cache.clone(), envelope.meta().no_cache());
let reservoir_counters = project.reservoir_counters();

let project_state = match project_state {
ProjectState::Enabled(state) => Some(state),
Expand Down Expand Up @@ -833,27 +894,21 @@ impl ProjectCacheBroker {
None
};

let key = QueueKey::new(own_key, sampling_key.unwrap_or(own_key));

// Trigger processing once we have a project state and we either have a sampling project
// state or we do not need one.
if let Some(project_state) = project_state {
if (sampling_state.is_some() || !requires_sampling_state)
&& self.memory_checker.check_memory().has_capacity()
&& self.global_config.is_ready()
{
relay_log::trace!("Sending envelope to processor");
self.services.envelope_processor.send(ProcessEnvelope {
envelope: managed_envelope,
project_info: project_state,
sampling_project_info: sampling_state,
reservoir_counters,
});

return;
}
if project_state.is_some()
&& (sampling_state.is_some() || !requires_sampling_state)
&& self.memory_checker.check_memory().has_capacity()
&& self.global_config.is_ready()
{
// TODO: Add ready project infos to the processing message.
relay_log::trace!("Sending envelope to processor");
return self.handle_processing(key, managed_envelope);
}

relay_log::trace!("Enqueueing envelope");
let key = QueueKey::new(own_key, sampling_key.unwrap_or(own_key));
self.enqueue(key, managed_envelope);
}

Expand Down Expand Up @@ -1207,16 +1262,10 @@ impl Service for ProjectCacheService {
}
// Buffer will not dequeue the envelopes from the spool if there is not enough
// permits in `BufferGuard` available. Currently this is 50%.
Some(UnspooledEnvelope { managed_envelope }) = buffer_rx.recv() => {
// Unspooled envelopes need to be checked, just like we do on the fast path.
if let Ok(CheckedEnvelope { envelope: Some(envelope), rate_limits: _ }) = metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_check_envelope", {
broker.handle_check_envelope(CheckEnvelope::new(managed_envelope))
}) {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_validate_envelope", {
broker.handle_validate_envelope(ValidateEnvelope { envelope });
Some(UnspooledEnvelope { managed_envelope, key }) = buffer_rx.recv() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_processing", {
broker.handle_processing(key, managed_envelope)
})
}

},
_ = ticker.tick() => {
metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "evict_project_caches", {
Expand Down Expand Up @@ -1469,15 +1518,17 @@ mod tests {

// Since there is no project we should not process anything but create a project and spool
// the envelope.
broker.handle_validate_envelope(ValidateEnvelope { envelope });
broker.handle_processing(key, envelope);

// Assert that we have a new project and also added an index.
assert!(broker.projects.get(&project_key).is_some());
assert!(broker.index.contains(&key));

// Check is we actually spooled anything.
buffer_svc.send(DequeueMany::new([key].into(), buffer_tx.clone()));
let UnspooledEnvelope { managed_envelope } = buffer_rx.recv().await.unwrap();
let UnspooledEnvelope {
managed_envelope, ..
} = buffer_rx.recv().await.unwrap();

assert_eq!(key, QueueKey::from_envelope(managed_envelope.envelope()));
}
Expand Down
16 changes: 13 additions & 3 deletions relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl QueueKey {
/// It's sent in response to [`DequeueMany`] message from the [`ProjectCache`].
#[derive(Debug)]
pub struct UnspooledEnvelope {
pub key: QueueKey,
pub managed_envelope: ManagedEnvelope,
}

Expand Down Expand Up @@ -375,6 +376,7 @@ impl InMemory {
self.envelope_count = self.envelope_count.saturating_sub(1);
sender
.send(UnspooledEnvelope {
key,
managed_envelope: envelope,
})
.ok();
Expand Down Expand Up @@ -584,9 +586,14 @@ impl OnDisk {
};

match self.extract_envelope(envelope, services) {
Ok((_key, managed_envelopes)) => {
Ok((key, managed_envelopes)) => {
for managed_envelope in managed_envelopes {
sender.send(UnspooledEnvelope { managed_envelope }).ok();
sender
.send(UnspooledEnvelope {
key,
managed_envelope,
})
.ok();
}
}
Err(err) => relay_log::error!(
Expand Down Expand Up @@ -1431,7 +1438,10 @@ mod tests {
sender: tx.clone(),
});

let UnspooledEnvelope { managed_envelope } = rx.recv().await.unwrap();
let UnspooledEnvelope {
key: _,
managed_envelope,
} = rx.recv().await.unwrap();
let start_time_received = managed_envelope.envelope().meta().start_time();

// Check if the original start time elapsed to the same second as the restored one.
Expand Down

0 comments on commit e9434ab

Please sign in to comment.