Skip to content

Commit

Permalink
chore(spool): Improve unspool strategy (#3047)
Browse files Browse the repository at this point in the history
The recent stats show that when we have a lot of records with only 1
envelope per key, the unspool is very slow, since the service has to go
into DB and fetch 1 key at a time, which can block the `Buffer` and by
accumulating the messages in the internal queue it can also use up more
memory.

This change is introducing batching for all the valid keys, which makes
sure we can unspool many records at once, which allows us to decrease
the number sent message between the services and also read from the disk
more data in one go.

Make sure we can unspool faster from the buffer:
* requests all the envelopes for all the valid projects
* batch those requests for all the incoming keys
* make sure we return keys back when is over low watermark
  • Loading branch information
olksdr committed Feb 12, 2024
1 parent d63d3ed commit 4e57b1f
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 179 deletions.
184 changes: 87 additions & 97 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
use relay_config::{Config, RelayMode};
use relay_dynamic_config::GlobalConfig;
Expand Down Expand Up @@ -183,14 +184,11 @@ pub struct AddMetricMeta {
/// This message is sent from the project buffer in case of the error while fetching the data from
/// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the
/// persistent storage.
pub struct UpdateSpoolIndex {
project_key: ProjectKey,
keys: BTreeSet<QueueKey>,
}
pub struct UpdateSpoolIndex(pub HashSet<QueueKey>);

impl UpdateSpoolIndex {
pub fn new(project_key: ProjectKey, keys: BTreeSet<QueueKey>) -> Self {
Self { project_key, keys }
pub fn new(keys: HashSet<QueueKey>) -> Self {
Self(keys)
}
}

Expand All @@ -203,7 +201,7 @@ pub struct SpoolHealth;
/// This index will be received only once shortly after startup and will trigger refresh for the
/// project states for the project keys returned in the message.
#[derive(Debug)]
pub struct RefreshIndexCache(pub BTreeMap<ProjectKey, BTreeSet<QueueKey>>);
pub struct RefreshIndexCache(pub HashSet<QueueKey>);

/// A cache for [`ProjectState`]s.
///
Expand Down Expand Up @@ -492,7 +490,7 @@ struct ProjectCacheBroker {
buffer_tx: mpsc::UnboundedSender<ManagedEnvelope>,
buffer_guard: Arc<BufferGuard>,
/// Index of the buffered project keys.
index: BTreeMap<ProjectKey, BTreeSet<QueueKey>>,
index: HashSet<QueueKey>,
buffer_unspool_handle: SleepHandle,
buffer_unspool_backoff: RetryBackoff,
buffer: Addr<Buffer>,
Expand Down Expand Up @@ -524,58 +522,17 @@ impl ProjectCacheBroker {

/// Adds the value to the queue for the provided key.
pub fn enqueue(&mut self, key: QueueKey, value: ManagedEnvelope) {
self.index.entry(key.own_key).or_default().insert(key);
self.index.entry(key.sampling_key).or_default().insert(key);
self.index.insert(key);
self.buffer.send(Enqueue::new(key, value));
}

/// Sends the message to the buffer service to dequeue the envelopes.
///
/// All the found envelopes will be send back through the `buffer_tx` channel and directly
/// forwarded to `handle_processing`.
pub fn dequeue(&mut self, partial_key: ProjectKey) {
// If we don't yet have the global config, we will defer dequeuing until we do.
if let GlobalConfigStatus::Pending = self.global_config {
return;
}

let mut result = Vec::new();
let mut queue_keys = self.index.remove(&partial_key).unwrap_or_default();
let mut index = BTreeSet::new();

while let Some(queue_key) = queue_keys.pop_first() {
// We only have to check `other_key`, because we already know that the `partial_key`s `state`
// is valid and loaded.
let other_key = if queue_key.own_key == partial_key {
queue_key.sampling_key
} else {
queue_key.own_key
};

if self
.projects
.get(&other_key)
// Make sure we have only cached and valid state.
.and_then(|p| p.valid_state())
.map_or(false, |s| !s.invalid())
{
result.push(queue_key);
} else {
index.insert(queue_key);
}
}

if !index.is_empty() {
self.index.insert(partial_key, index);
}

if !result.is_empty() {
self.buffer.send(DequeueMany::new(
partial_key,
result,
self.buffer_tx.clone(),
))
}
pub fn dequeue(&self, keys: HashSet<QueueKey>) {
self.buffer
.send(DequeueMany::new(keys, self.buffer_tx.clone()))
}

/// Evict projects that are over its expiry date.
Expand All @@ -595,7 +552,12 @@ impl ProjectCacheBroker {
// Defer dropping the projects to a dedicated thread:
let mut count = 0;
for (project_key, project) in expired {
if let Some(keys) = self.index.remove(&project_key) {
let keys = self
.index
.drain_filter(|key| key.own_key == project_key || key.sampling_key == project_key)
.collect::<BTreeSet<_>>();

if !keys.is_empty() {
self.buffer.send(RemoveMany::new(project_key, keys))
}

Expand Down Expand Up @@ -868,7 +830,7 @@ impl ProjectCacheBroker {
}

fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) {
self.index.insert(message.project_key, message.keys);
self.index.extend(message.0);
}

fn handle_spool_health(&mut self, sender: Sender<bool>) {
Expand All @@ -879,10 +841,14 @@ impl ProjectCacheBroker {
let RefreshIndexCache(index) = message;
let project_cache = self.services.project_cache.clone();

for (key, values) in index {
self.index.entry(key).or_default().extend(values);
self.get_or_create_project(key)
for key in index {
self.index.insert(key);
self.get_or_create_project(key.own_key)
.prefetch(project_cache.clone(), false);
if key.own_key != key.sampling_key {
self.get_or_create_project(key.sampling_key)
.prefetch(project_cache.clone(), false);
}
}
}

Expand All @@ -899,6 +865,44 @@ impl ProjectCacheBroker {
}
}

/// Returns `true` if the project state valid for the [`QueueKey`].
///
/// Which includes the own key and the samplig key for the project.
/// Note: this function will trigger [`ProjectState`] refresh if it's already expired or not
/// valid.
fn is_state_valid(&mut self, key: &QueueKey) -> bool {
let QueueKey {
own_key,
sampling_key,
} = key;

let is_own_state_valid = self.projects.get_mut(own_key).map_or(false, |project| {
// Returns `Some` if the project is cached otherwise None and also triggers refresh
// in background.
project
.get_cached_state(self.services.project_cache.clone(), false)
// Makes sure that the state also is valid.
.map_or(false, |state| !state.invalid())
});

let is_sampling_state_valid = if own_key != sampling_key {
self.projects
.get_mut(sampling_key)
.map_or(false, |project| {
// Returns `Some` if the project is cached otherwise None and also triggers refresh
// in background.
project
.get_cached_state(self.services.project_cache.clone(), false)
// Makes sure that the state also is valid.
.map_or(false, |state| !state.invalid())
})
} else {
is_own_state_valid
};

is_own_state_valid && is_sampling_state_valid
}

/// Iterates the buffer index and tries to unspool the envelopes for projects with a valid
/// state.
///
Expand All @@ -907,40 +911,33 @@ impl ProjectCacheBroker {
fn handle_periodic_unspool(&mut self) {
self.buffer_unspool_handle.reset();

// If we don't yet have the global config, we will defer dequeuing until we do.
if let GlobalConfigStatus::Pending = self.global_config {
self.buffer_unspool_backoff.reset();
self.schedule_unspool();
return;
}
// If there is nothing spooled, schedule the next check a little bit later.
if self.index.is_empty() {
// And do *not* attempt to unspool if the assigned permits over low watermark.
if self.index.is_empty() || !self.buffer_guard.is_below_low_watermark() {
self.schedule_unspool();
return;
}

let keys = self.index.keys().cloned().collect::<Box<[_]>>();
let mut dequeued = false;

for project_key in keys.iter() {
if self.projects.get_mut(project_key).map_or(false, |project| {
// Returns `Some` if the project is cache otherwise None and also triggers refresh
// in background.
project
.get_cached_state(self.services.project_cache.clone(), false)
// Makes sure that the state also is valid.
.map_or(false, |state| !state.invalid())
}) {
// Do *not* attempt to unspool if the assigned permits over low watermark.
if !self.buffer_guard.is_below_low_watermark() {
self.schedule_unspool();
return;
}
let mut index = std::mem::take(&mut self.index);
let values = index
.drain_filter(|key| self.is_state_valid(key))
.collect::<HashSet<_>>();

self.dequeue(*project_key);
dequeued = true;
}
if !values.is_empty() {
self.dequeue(values);
}

// If cannot dequeue for some reason, back off the next retry.
if dequeued {
self.buffer_unspool_backoff.reset();
}
// Return all the un-used items to the index.
self.index.extend(index);

// Schedule unspool once we are done.
self.buffer_unspool_backoff.reset();
self.schedule_unspool();
}

Expand Down Expand Up @@ -1073,7 +1070,7 @@ impl Service for ProjectCacheService {
state_tx,
buffer_tx,
buffer_guard,
index: BTreeMap::new(),
index: HashSet::new(),
buffer_unspool_handle: SleepHandle::idle(),
buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()),
buffer,
Expand Down Expand Up @@ -1208,7 +1205,7 @@ mod tests {
state_tx,
buffer_tx,
buffer_guard,
index: BTreeMap::new(),
index: HashSet::new(),
buffer: buffer.clone(),
global_config: GlobalConfigStatus::Pending,
buffer_unspool_handle: SleepHandle::idle(),
Expand Down Expand Up @@ -1250,8 +1247,7 @@ mod tests {

// All the messages should have been spooled to disk.
assert_eq!(buffer_guard.available(), 5);
assert_eq!(broker.index.keys().len(), 1);
assert_eq!(broker.index.values().count(), 1);
assert_eq!(broker.index.len(), 1);

let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap();
let key = QueueKey {
Expand All @@ -1261,11 +1257,7 @@ mod tests {
let (tx, mut rx) = mpsc::unbounded_channel();

// Check if we can also dequeue from the buffer directly.
buffer_svc.send(spooler::DequeueMany::new(
project_key,
[key].into(),
tx.clone(),
));
buffer_svc.send(spooler::DequeueMany::new([key].into(), tx.clone()));
tokio::time::sleep(Duration::from_millis(100)).await;

// We should be able to unspool 5 envelopes since we have 5 permits.
Expand All @@ -1283,11 +1275,10 @@ mod tests {

// Till now we should have enqueued 5 envelopes and dequeued only 1, it means the index is
// still populated with same keys and values.
assert_eq!(broker.index.keys().len(), 1);
assert_eq!(broker.index.values().count(), 1);
assert_eq!(broker.index.len(), 1);

// Check if we can also dequeue from the buffer directly.
buffer_svc.send(spooler::DequeueMany::new(project_key, [key].into(), tx));
buffer_svc.send(spooler::DequeueMany::new([key].into(), tx));
tokio::time::sleep(Duration::from_millis(100)).await;
// Cannot dequeue anymore, no more available permits.
assert!(rx.try_recv().is_err());
Expand Down Expand Up @@ -1353,8 +1344,7 @@ mod tests {
select! {

Some(assert) = rx_assert.recv() => {
assert_eq!(broker.index.keys().len(), assert);
assert_eq!(broker.index.values().count(), assert);
assert_eq!(broker.index.len(), assert);
},
Some(update) = rx_update.recv() => broker.merge_state(update),
() = &mut broker.buffer_unspool_handle => broker.handle_periodic_unspool(),
Expand Down
Loading

0 comments on commit 4e57b1f

Please sign in to comment.