Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a basic metric for tracking the capacity in VecDeque buffer #383

Merged
merged 9 commits into from
Sep 11, 2024
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion rust-arroyo/src/processing/dlq.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use rand::seq::index::sample;
use rand::Rng;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fmt;
Expand All @@ -11,6 +13,7 @@ use tokio::task::JoinHandle;
use crate::backends::kafka::producer::KafkaProducer;
use crate::backends::kafka::types::KafkaPayload;
use crate::backends::Producer;
use crate::gauge;
use crate::types::{BrokerMessage, Partition, Topic, TopicOrPartition};

// This is a per-partition max
Expand Down Expand Up @@ -402,6 +405,12 @@ impl<TPayload> BufferedMessages<TPayload> {
return;
}

// Number of partitions in the buffer map
gauge!(
"arroyo.consumer.dlq_buffer.assigned_partitions",
self.buffered_messages.len() as u64,
);

let buffered = self.buffered_messages.entry(message.partition).or_default();
if let Some(max) = self.max_per_partition {
if buffered.len() >= max {
Expand All @@ -414,22 +423,46 @@ impl<TPayload> BufferedMessages<TPayload> {
}

buffered.push_back(message);

// Number of elements that can be held in buffer deque without reallocating
gauge!(
"arroyo.consumer.dlq_buffer.capacity",
buffered.capacity() as u64
);
}

/// Return the message at the given offset or None if it is not found in the buffer.
/// Messages up to the offset for the given partition are removed.
pub fn pop(&mut self, partition: &Partition, offset: u64) -> Option<BrokerMessage<TPayload>> {
// Number of partitions in the buffer map
gauge!(
"arroyo.consumer.dlq_buffer.assigned_partitions",
self.buffered_messages.len() as u64,
);

let messages = self.buffered_messages.get_mut(partition)?;
while let Some(message) = messages.front() {
match message.offset.cmp(&offset) {
Ordering::Equal => {
return messages.pop_front();
let first = messages.pop_front();

gauge!(
"arroyo.consumer.dlq_buffer.capacity",
messages.capacity() as u64
);

return first;
}
Ordering::Greater => {
return None;
}
Ordering::Less => {
messages.pop_front();

gauge!(
"arroyo.consumer.dlq_buffer.capacity",
messages.capacity() as u64
);
}
};
}
Expand Down
Loading