Skip to content

Commit

Permalink
feat: Ingest a metric for rdkafka queue size (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops authored Mar 5, 2024
1 parent eb660ca commit 56d361a
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions rust-arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::CommitOffsets;
use super::Consumer as ArroyoConsumer;
use super::ConsumerError;
use crate::backends::kafka::types::KafkaPayload;
use crate::gauge;
use crate::types::{BrokerMessage, Partition, Topic};
use chrono::{DateTime, NaiveDateTime, Utc};
use parking_lot::Mutex;
Expand All @@ -15,6 +16,7 @@ use rdkafka::error::KafkaError;
use rdkafka::message::{BorrowedMessage, Message};
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
use rdkafka::types::{RDKafkaErrorCode, RDKafkaRespErr};
use rdkafka::Statistics;
use sentry::Hub;
use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -167,6 +169,13 @@ impl<C: AssignmentCallbacks + Send + Sync> ClientContext for CustomContext<C> {
tracing::error!(error, "librdkafka: {error}: {reason}");
})
}

fn stats(&self, stats: Statistics) {
gauge!(
"arroyo.consumer.librdkafka.total_queue_size",
stats.replyq as u64,
);
}
}

impl<C: AssignmentCallbacks> ConsumerContext for CustomContext<C> {
Expand Down

0 comments on commit 56d361a

Please sign in to comment.