Skip to content

Commit

Permalink
Revert "feat(spool): Avoid evicting old project for incoming envelope…
Browse files Browse the repository at this point in the history
…s" (#2743)

This should be a temporary revert to validate the assumption that this
change could cause the relay to keep the some of the incoming envelopes
in memory for the longer period of time. Which can cause very slow
memory leak.
  • Loading branch information
olksdr authored Nov 20, 2023
1 parent 8b22c6e commit 2757887
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 108 deletions.
2 changes: 1 addition & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ uuid = { workspace = true, features = ["v5"] }
zstd = { version = "0.12.3", optional = true }

[dev-dependencies]
tokio = { workspace = true, features = ["test-util", "time"] }
tokio = { workspace = true, features = ['test-util'] }
insta = { workspace = true }
relay-event-schema = { path = "../relay-event-schema", features = [
"jsonschema",
Expand Down
12 changes: 1 addition & 11 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ pub struct Project {
backoff: RetryBackoff,
next_fetch_attempt: Option<Instant>,
last_updated_at: Instant,
last_envelope_seen: Instant,
project_key: ProjectKey,
config: Arc<Config>,
state: State,
Expand All @@ -447,7 +446,6 @@ impl Project {
backoff: RetryBackoff::new(config.http_max_retry_interval()),
next_fetch_attempt: None,
last_updated_at: Instant::now(),
last_envelope_seen: Instant::now(),
project_key: key,
state: State::new(config.permissive_aggregator_config()),
state_channel: None,
Expand Down Expand Up @@ -536,11 +534,6 @@ impl Project {
self.last_updated_at
}

/// The last time that this project was used for an incoming envelope.
pub fn last_envelope_seen_at(&self) -> Instant {
self.last_envelope_seen
}

/// Refresh the update time of the project in order to delay eviction.
///
/// Called by the project cache when the project state is refreshed.
Expand Down Expand Up @@ -983,9 +976,6 @@ impl Project {
let state = self.valid_state().filter(|state| !state.invalid());
let mut scoping = envelope.scoping();

// On every incoming envelope, which belongs to this project, update when it was last seen.
self.last_envelope_seen = envelope.start_time().into();

if let Some(ref state) = state {
scoping = state.scope_request(envelope.envelope().meta());
envelope.scope(scoping);
Expand Down Expand Up @@ -1139,7 +1129,7 @@ mod tests {

// This tests that we actually initiate the backoff and the backoff mechanism works:
// * first call to `update_state` with invalid ProjectState starts the backoff, but since
// it's the first attempt, we get Duration of 0.
// it's the first attemt, we get Duration of 0.
// * second call to `update_state` here will bumpt the `next_backoff` Duration to somehing
// like ~ 1s
// * and now, by calling `fetch_state` we test that it's a noop, since if backoff is active
Expand Down
113 changes: 17 additions & 96 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,9 @@ impl ProjectCacheBroker {
let eviction_start = Instant::now();
let delta = 2 * self.config.project_cache_expiry() + self.config.project_grace_period();

// Get all the projects which have not updated in the configured time and also have not gotten
// any new incoming envelopes in the meantime.
//
// In case of an incident it can happen, that the ProjectState cannot get updated, and we
// still getting incoming traffic - we want to keep the buffered envelopes as long as we
// reasonable can (till the incident is resolved) and process them as usual.
let expired = self.projects.drain_filter(|_, entry| {
entry.last_updated_at() + delta <= eviction_start
&& entry.last_envelope_seen_at() + delta <= eviction_start
});
let expired = self
.projects
.drain_filter(|_, entry| entry.last_updated_at() + delta <= eviction_start);

// Defer dropping the projects to a dedicated thread:
let mut count = 0;
Expand Down Expand Up @@ -947,7 +940,6 @@ mod tests {
use relay_test::mock_service;
use uuid::Uuid;

use crate::actors::project::ExpiryState;
use crate::testutils::empty_envelope;

use super::*;
Expand All @@ -973,12 +965,21 @@ mod tests {
}

async fn project_cache_broker_setup(
config: Arc<Config>,
services: Services,
buffer_guard: Arc<BufferGuard>,
state_tx: mpsc::UnboundedSender<UpdateProjectState>,
buffer_tx: mpsc::UnboundedSender<ManagedEnvelope>,
) -> (ProjectCacheBroker, Addr<Buffer>) {
let config: Arc<_> = Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": std::env::temp_dir().join(Uuid::new_v4().to_string()),
"max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes.
}
}
}))
.unwrap()
.into();
let buffer_services = spooler::Services {
outcome_aggregator: services.outcome_aggregator.clone(),
project_cache: services.project_cache.clone(),
Expand Down Expand Up @@ -1024,25 +1025,9 @@ mod tests {
let services = mocked_services();
let (state_tx, _) = mpsc::unbounded_channel();
let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel();
let config = Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": std::env::temp_dir().join(Uuid::new_v4().to_string()),
"max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes.
}
}
}))
.unwrap()
.into();

let (mut broker, buffer_svc) = project_cache_broker_setup(
config,
services.clone(),
buffer_guard.clone(),
state_tx,
buffer_tx,
)
.await;
let (mut broker, buffer_svc) =
project_cache_broker_setup(services.clone(), buffer_guard.clone(), state_tx, buffer_tx)
.await;

for _ in 0..8 {
let envelope = buffer_guard
Expand Down Expand Up @@ -1093,7 +1078,7 @@ mod tests {
envelopes.pop().unwrap();
assert_eq!(buffer_guard.available(), 1);

// Till now we should have enqueued 5 envelopes and dequeued only 1, it means the index is
// Till now we should have enqueued 5 envleopes and dequeued only 1, it means the index is
// still populated with same keys and values.
assert_eq!(broker.index.keys().len(), 1);
assert_eq!(broker.index.values().count(), 1);
Expand All @@ -1119,68 +1104,4 @@ mod tests {
assert!(buffer_rx.try_recv().is_err())
}
}

#[tokio::test]
async fn test_eviction() {
tokio::time::pause();

let num_permits = 5;
let buffer_guard: Arc<_> = BufferGuard::new(num_permits).into();
let services = mocked_services();
let (state_tx, _) = mpsc::unbounded_channel();
let (buffer_tx, _) = mpsc::unbounded_channel();

// Projects should be expired after 2 seconds.
let config: Arc<_> = Config::from_json_value(serde_json::json!({
"cache": {
"project_expiry": 1
}
}))
.unwrap()
.into();

let (mut broker, _) = project_cache_broker_setup(
config.clone(),
services.clone(),
buffer_guard,
state_tx,
buffer_tx,
)
.await;

let key1 = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap();
let project1 = Project::new(key1, config.clone());
let key2 = ProjectKey::parse("eeed836b15bb49d7bbf99e64295d9bbb").unwrap();
let project2 = Project::new(key2, config);

broker.projects.insert(key1, project1);
broker.projects.insert(key2, project2);

for _ in 0..20 {
let envelope = ManagedEnvelope::untracked(
empty_envelope(),
services.outcome_aggregator.clone(),
services.test_store.clone(),
);
if let hashbrown::hash_map::Entry::Occupied(mut e) = broker.projects.entry(key2) {
e.get_mut()
.check_envelope(envelope, services.outcome_aggregator.clone())
.unwrap();
}
tokio::time::advance(Duration::from_millis(100)).await;
}

// One of the project will be removed.
broker.evict_stale_project_caches();
// Project 1 did not receive any envelopes, and should be removed now.
assert_eq!(broker.projects.len(), 1);
assert!(broker.projects.get(&key1).is_none());

// Project 2, even though expired, still must be in the cache.
assert!(matches!(
broker.projects.get(&key2).unwrap().expiry_state(),
ExpiryState::Expired
));
assert!(broker.projects.get(&key2).is_some());
}
}

0 comments on commit 2757887

Please sign in to comment.