-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spooler): Implement eviction mechanism when buffer is full #4014
Conversation
26154ba
to
88a9fb5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, aside from some nits.
#[derive(Debug, Copy, Clone)] | ||
struct LRUItem(ProjectKeyPair, Readiness, Instant); | ||
|
||
impl PartialEq for LRUItem { | ||
fn eq(&self, other: &Self) -> bool { | ||
self.0 == other.0 | ||
} | ||
} | ||
|
||
impl PartialOrd for LRUItem { | ||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||
Some(self.cmp(other)) | ||
} | ||
} | ||
|
||
impl Eq for LRUItem {} | ||
|
||
impl Ord for LRUItem { | ||
fn cmp(&self, other: &Self) -> Ordering { | ||
match (self.1.ready(), other.1.ready()) { | ||
(true, false) => Ordering::Greater, | ||
(false, true) => Ordering::Less, | ||
_ => self.2.cmp(&other.2), | ||
} | ||
} | ||
} | ||
|
||
// We calculate how many envelope stacks we want to evict. | ||
let max_lru_length = | ||
((self.priority_queue.len() as f32) * self.evictable_stacks_percentage).ceil() as usize; | ||
relay_log::trace!( | ||
"Evicting {} elements from the envelope buffer", | ||
max_lru_length | ||
); | ||
|
||
let mut lru: BinaryHeap<LRUItem> = BinaryHeap::with_capacity(max_lru_length); | ||
relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), { | ||
for (queue_item, priority) in self.priority_queue.iter() { | ||
let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); | ||
|
||
// If we exceed the size, we want to pop the greatest element only if we have a smaller | ||
// element, so that we end up with the smallest elements which are the ones with the | ||
// lowest priority. | ||
if lru.len() >= max_lru_length { | ||
let Some(top_lru_item) = lru.peek() else { | ||
continue; | ||
}; | ||
|
||
if lru_item < *top_lru_item { | ||
lru.pop(); | ||
} | ||
} | ||
|
||
lru.push(lru_item); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if instead of building this heap, we could just start evicting from the back of vector that the priority queue.
There's no guarantee that that would affect the least-priority stacks (because a priority is not sorted), and the last_peek
logic makes the priority even less reliable as indicator. But since evicting is a last-resort measure that will always drop data, popping from the back of the pq vector might be good enough, and would simplify a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you would do a reverse iterator and just delete x
elements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, pretty much. But your solution is definitely more correct, and it respects the LIFO policy, so I'm ambivalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep my solution to keep the implementation more predictable.
We decided for now not to proceed with implementing eviction. |
This PR implements an eviction algorithm that can be triggered via the evict method call on the EnvelopeBuffer. The eviction policy aims to evict all the envelope stacks that are not ready or have been stale for the longest time.
The eviction is triggered every 100ms and it's tried based on the result of the capacity check. In case the capacity fails, the buffer will try to evict some envelope stacks and connected envelopes to free up some space. This is done assuming that we want to gracefully handle new incoming load in case of high memory/disk usage by penalizing old data.
Closes: https://github.com/getsentry/team-ingest/issues/518