Skip to content

Commit

Permalink
fix(server): Avoid a race condition in dequeuing after global config (#…
Browse files Browse the repository at this point in the history
…2753)

`RequestUpdate` cannot be called directly, we need to go through
`Project::get_or_fetch_state` to ensure that internal invariants are not
violated. In this instance, the message does not create a required
channel in the `Project` instance.
  • Loading branch information
jan-auer authored Nov 22, 2023
1 parent abd0eff commit f1ee94e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 24 deletions.
39 changes: 25 additions & 14 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,23 +490,34 @@ impl GlobalConfigStatus {

impl ProjectCacheBroker {
fn set_global_config(&mut self, global_config: Arc<GlobalConfig>) {
if let GlobalConfigStatus::Pending(project_keys) = std::mem::replace(
let GlobalConfigStatus::Pending(project_keys) = std::mem::replace(
&mut self.global_config,
GlobalConfigStatus::Ready(global_config),
) {
relay_log::info!(
"global config received: {} project were pending",
project_keys.len()
);
// When the global config arrives, we will request new ProjectStates for all the
// projects. This will trigger dequeuing of that we deferred in ProjectCacheBroker::dequeue.
for project_key in project_keys {
let message = RequestUpdate {
project_key,
no_cache: false,
};
) else {
return;
};

relay_log::info!(
"global config received: {} projects were pending",
project_keys.len()
);

self.services.project_cache.send(message)
// Check which of the pending projects can be dequeued now:
for project_key in project_keys {
let project_cache = self.services.project_cache.clone();

// Check for a cached state:
// 1. If the state is cached and valid, trigger an immediate dequeue. There could be a
// fetch running in the background, but this is not guaranteed.
// 2. If the state is not cached, `get_cached_state` will trigger a fetch in the
// background. Once the fetch completes, the project will automatically cause a
// dequeue by sending `UpdateProjectState` to the broker.
let state = self
.get_or_create_project(project_key)
.get_cached_state(project_cache, false);

if state.map_or(false, |s| !s.invalid()) {
self.dequeue(project_key);
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions tests/integration/test_envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,12 +623,9 @@ def get_project_config():
finally:
mini_sentry.test_failures.clear()

try:
envelopes = []
# Check that we received exactly {envelope_qty} envelopes.
for _ in range(envelope_qty):
envelopes.append(events_consumer.get_event(timeout=2))
events_consumer.assert_empty()
assert len(envelopes) == envelope_qty
finally:
mini_sentry.test_failures.clear()
envelopes = []
# Check that we received exactly {envelope_qty} envelopes.
for _ in range(envelope_qty):
envelopes.append(events_consumer.get_event(timeout=2))
events_consumer.assert_empty()
assert len(envelopes) == envelope_qty
2 changes: 1 addition & 1 deletion tests/integration/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def get_project_config():

event = mini_sentry.captured_events.get(timeout=12).get_event()
assert event["logentry"] == {"formatted": "Hello, World!"}
assert retry_count == 4
assert retry_count == 3

if mini_sentry.test_failures:
for _, error in mini_sentry.test_failures:
Expand Down

0 comments on commit f1ee94e

Please sign in to comment.