Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a basic metric for tracking the capacity in VecDeque buffer #383

Merged
merged 9 commits into from
Sep 11, 2024
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion rust-arroyo/src/processing/dlq.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use rand::seq::index::sample;
use rand::Rng;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fmt;
Expand All @@ -11,6 +13,7 @@ use tokio::task::JoinHandle;
use crate::backends::kafka::producer::KafkaProducer;
use crate::backends::kafka::types::KafkaPayload;
use crate::backends::Producer;
use crate::gauge;
use crate::types::{BrokerMessage, Partition, Topic, TopicOrPartition};

// This is a per-partition max
Expand Down Expand Up @@ -398,10 +401,21 @@ impl<TPayload> BufferedMessages<TPayload> {
///
/// If the configured `max_per_partition` is `0`, this is a no-op.
pub fn append(&mut self, message: BrokerMessage<TPayload>) {
let mut rng = rand::thread_rng();
let metric_prob: f64 = rng.gen();

if self.max_per_partition == Some(0) {
return;
}

if metric_prob <= 0.01 {
// Number of partitions in the buffer map
gauge!(
"arroyo.consumer.dlq_buffer.assigned_partitions",
self.buffered_messages.len() as u64,
);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is repeated several times. Can this be moved to a function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice this code. In fact, this sampling should not be required at all. it has some performance benefits in python but they should not be present for rust. let's remove the sampling entirely.

let buffered = self.buffered_messages.entry(message.partition).or_default();
if let Some(max) = self.max_per_partition {
if buffered.len() >= max {
Expand All @@ -414,22 +428,57 @@ impl<TPayload> BufferedMessages<TPayload> {
}

buffered.push_back(message);

// Number of elements that can be held in buffer deque without reallocating
if metric_prob <= 0.01 {
// Number of partitions in the buffer map
gauge!(
"arroyo.consumer.dlq_buffer.assigned_partitions",
self.buffered_messages.len() as u64,
);
}
}

/// Return the message at the given offset or None if it is not found in the buffer.
/// Messages up to the offset for the given partition are removed.
pub fn pop(&mut self, partition: &Partition, offset: u64) -> Option<BrokerMessage<TPayload>> {
let mut rng = rand::thread_rng();
let metric_prob: f64 = rng.gen();

// Number of partitions in the buffer map
if metric_prob <= 0.01 {
gauge!(
"arroyo.consumer.dlq_buffer.assigned_partitions",
self.buffered_messages.len() as u64,
);
}

let messages = self.buffered_messages.get_mut(partition)?;
while let Some(message) = messages.front() {
match message.offset.cmp(&offset) {
Ordering::Equal => {
return messages.pop_front();
let first = messages.pop_front();

if metric_prob <= 0.01 {
gauge!(
"arroyo.consumer.dlq_buffer.capacity",
messages.capacity() as u64
);
}
return first;
}
Ordering::Greater => {
return None;
}
Ordering::Less => {
messages.pop_front();

if metric_prob <= 0.01 {
gauge!(
"arroyo.consumer.dlq_buffer.capacity",
messages.capacity() as u64
);
}
}
};
}
Expand Down
Loading