Skip to content

Commit

Permalink
fix(spooler): Proactively populate the project cache for envelopes sp…
Browse files Browse the repository at this point in the history
…ooled to disk (#2962)

When the Relay restarts the index of the spooled to disk envelopes must
be restored:
* read all the project keys we spooled to disk
* generate unique index
* make sure the project cache fetches them

Once the project state received Relay will start to unspool saved to
disk envelopes.

This should allow Relay to unspool faster, even for the projects which
do not have the high volume traffic.
  • Loading branch information
olksdr authored Jan 22, 2024
1 parent 906c75f commit 57de7a1
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 13 deletions.
49 changes: 40 additions & 9 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::services::project_local::{LocalProjectSource, LocalProjectSourceServi
use crate::services::project_redis::RedisProjectSource;
use crate::services::project_upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
use crate::services::spooler::{
self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany,
self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex,
};
use crate::services::test_store::TestStore;
use crate::services::upstream::UpstreamRelay;
Expand Down Expand Up @@ -180,12 +180,12 @@ pub struct AddMetricMeta {
/// This message is sent from the project buffer in case of the error while fetching the data from
/// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the
/// persistent storage.
pub struct UpdateBufferIndex {
pub struct UpdateSpoolIndex {
project_key: ProjectKey,
keys: BTreeSet<QueueKey>,
}

impl UpdateBufferIndex {
impl UpdateSpoolIndex {
pub fn new(project_key: ProjectKey, keys: BTreeSet<QueueKey>) -> Self {
Self { project_key, keys }
}
Expand All @@ -195,6 +195,13 @@ impl UpdateBufferIndex {
#[derive(Debug)]
pub struct SpoolHealth;

/// The current envelopes index fetched from the underlying buffer spool.
///
/// This index will be received only once shortly after startup and will trigger refresh for the
/// project states for the project keys returned in the message.
#[derive(Debug)]
pub struct RefreshIndexCache(pub BTreeMap<ProjectKey, BTreeSet<QueueKey>>);

/// A cache for [`ProjectState`]s.
///
/// The project maintains information about organizations, projects, and project keys along with
Expand Down Expand Up @@ -223,17 +230,26 @@ pub enum ProjectCache {
MergeBuckets(MergeBuckets),
AddMetricMeta(AddMetricMeta),
FlushBuckets(FlushBuckets),
UpdateBufferIndex(UpdateBufferIndex),
UpdateSpoolIndex(UpdateSpoolIndex),
SpoolHealth(Sender<bool>),
RefreshIndexCache(RefreshIndexCache),
}

impl Interface for ProjectCache {}

impl FromMessage<UpdateBufferIndex> for ProjectCache {
impl FromMessage<UpdateSpoolIndex> for ProjectCache {
type Response = relay_system::NoResponse;

fn from_message(message: UpdateSpoolIndex, _: ()) -> Self {
Self::UpdateSpoolIndex(message)
}
}

impl FromMessage<RefreshIndexCache> for ProjectCache {
type Response = relay_system::NoResponse;

fn from_message(message: UpdateBufferIndex, _: ()) -> Self {
Self::UpdateBufferIndex(message)
fn from_message(message: RefreshIndexCache, _: ()) -> Self {
Self::RefreshIndexCache(message)
}
}

Expand Down Expand Up @@ -878,14 +894,25 @@ impl ProjectCacheBroker {
.send(EncodeMetrics { scopes: output })
}

fn handle_buffer_index(&mut self, message: UpdateBufferIndex) {
fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) {
self.index.insert(message.project_key, message.keys);
}

fn handle_spool_health(&mut self, sender: Sender<bool>) {
self.buffer.send(spooler::Health(sender))
}

fn handle_refresh_index_cache(&mut self, message: RefreshIndexCache) {
let RefreshIndexCache(index) = message;
let project_cache = self.services.project_cache.clone();

for (key, values) in index {
self.index.entry(key).or_default().extend(values);
self.get_or_create_project(key)
.prefetch(project_cache.clone(), false);
}
}

fn handle_message(&mut self, message: ProjectCache) {
match message {
ProjectCache::RequestUpdate(message) => self.handle_request_update(message),
Expand All @@ -901,8 +928,9 @@ impl ProjectCacheBroker {
ProjectCache::MergeBuckets(message) => self.handle_merge_buckets(message),
ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message),
ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message),
ProjectCache::UpdateBufferIndex(message) => self.handle_buffer_index(message),
ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message),
ProjectCache::SpoolHealth(sender) => self.handle_spool_health(sender),
ProjectCache::RefreshIndexCache(message) => self.handle_refresh_index_cache(message),
}
}
}
Expand Down Expand Up @@ -996,6 +1024,9 @@ impl Service for ProjectCacheService {
}
};

// Request the existing index from the spooler.
buffer.send(RestoreIndex);

// Main broker that serializes public and internal messages, and triggers project state
// fetches via the project source.
let mut broker = ProjectCacheBroker {
Expand Down
Loading

0 comments on commit 57de7a1

Please sign in to comment.