Skip to content

Commit

Permalink
ref(kafka): Simplify producer types (#3462)
Browse files Browse the repository at this point in the history
Remove `SingleProducer` and `ProducerInner` types.

Follow-up to #3415.

---------

Co-authored-by: David Herberth <david.herberth@sentry.io>
  • Loading branch information
jjbayer and Dav1dde authored Apr 19, 2024
1 parent cad1967 commit 51c35c7
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 122 deletions.
4 changes: 2 additions & 2 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::Context;
use relay_auth::{generate_key_pair, generate_relay_id, PublicKey, RelayId, SecretKey};
use relay_common::Dsn;
use relay_kafka::{
ConfigError as KafkaConfigError, KafkaConfig, KafkaConfigParam, KafkaTopic, TopicAssignment,
ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment,
TopicAssignments,
};
use relay_metrics::aggregator::{AggregatorConfig, ShiftKey};
Expand Down Expand Up @@ -2236,7 +2236,7 @@ impl Config {
}

/// Configuration name and list of Kafka configuration parameters for a given topic.
pub fn kafka_config(&self, topic: KafkaTopic) -> Result<KafkaConfig, KafkaConfigError> {
pub fn kafka_config(&self, topic: KafkaTopic) -> Result<KafkaParams, KafkaConfigError> {
self.values.processing.topics.get(topic).kafka_config(
&self.values.processing.kafka_config,
&self.values.processing.secondary_kafka_configs,
Expand Down
73 changes: 25 additions & 48 deletions relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,7 @@ pub struct KafkaTopicConfig {
kafka_config_name: String,
}

/// Describes Kafka config, with all the parameters extracted, which will be used for creating the
/// kafka producer.
#[derive(Debug)]
pub enum KafkaConfig<'a> {
/// Single config with Kafka parameters.
Single {
/// Kafka parameters to create the kafka producer.
params: KafkaParams<'a>,
},
}

/// Sharded Kafka config.
/// Config for creating a Kafka producer.
#[derive(Debug)]
pub struct KafkaParams<'a> {
/// The topic name to use.
Expand All @@ -208,26 +197,22 @@ impl TopicAssignment {
&'a self,
default_config: &'a Vec<KafkaConfigParam>,
secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
) -> Result<KafkaConfig<'_>, ConfigError> {
) -> Result<KafkaParams<'_>, ConfigError> {
let kafka_config = match self {
Self::Primary(topic_name) => KafkaConfig::Single {
params: KafkaParams {
topic_name,
config_name: None,
params: default_config.as_slice(),
},
Self::Primary(topic_name) => KafkaParams {
topic_name,
config_name: None,
params: default_config.as_slice(),
},
Self::Secondary(KafkaTopicConfig {
topic_name,
kafka_config_name,
}) => KafkaConfig::Single {
params: KafkaParams {
config_name: Some(kafka_config_name),
topic_name,
params: secondary_configs
.get(kafka_config_name)
.ok_or(ConfigError::UnknownKafkaConfigName)?,
},
}) => KafkaParams {
config_name: Some(kafka_config_name),
topic_name,
params: secondary_configs
.get(kafka_config_name)
.ok_or(ConfigError::UnknownKafkaConfigName)?,
},
};

Expand Down Expand Up @@ -289,11 +274,9 @@ transactions: "ingest-transactions-kafka-topic"
.expect("Kafka config for events topic");
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-events-kafka-topic",
..
}
KafkaParams {
topic_name: "ingest-events-kafka-topic",
..
}
));

Expand All @@ -302,12 +285,10 @@ transactions: "ingest-transactions-kafka-topic"
.expect("Kafka config for profiles topic");
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-profiles",
config_name: Some("profiles"),
..
}
KafkaParams {
topic_name: "ingest-profiles",
config_name: Some("profiles"),
..
}
));

Expand All @@ -316,11 +297,9 @@ transactions: "ingest-transactions-kafka-topic"
.expect("Kafka config for metrics topic");
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-metrics-3",
..
}
KafkaParams {
topic_name: "ingest-metrics-3",
..
}
));

Expand All @@ -330,11 +309,9 @@ transactions: "ingest-transactions-kafka-topic"
.expect("Kafka config for transactions topic");
assert!(matches!(
transactions_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-transactions-kafka-topic",
..
}
KafkaParams {
topic_name: "ingest-transactions-kafka-topic",
..
}
));
}
Expand Down
125 changes: 53 additions & 72 deletions relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
//! This module contains the kafka producer related code.
//!
//! There are two different producers that are supported in Relay right now:
//! - [`SingleProducer`] - which sends all the messages to the defined kafka [`KafkaTopic`],

use std::borrow::Cow;
use std::cell::Cell;
Expand All @@ -16,7 +13,7 @@ use rdkafka::ClientConfig;
use relay_statsd::metric;
use thiserror::Error;

use crate::config::{KafkaConfig, KafkaParams, KafkaTopic};
use crate::config::{KafkaParams, KafkaTopic};
use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};

mod utils;
Expand Down Expand Up @@ -80,16 +77,29 @@ pub trait Message {
}

/// Single kafka producer config with assigned topic.
struct SingleProducer {
struct Producer {
/// Time of the latest collection of stats.
last_report: Cell<Instant>,
/// Kafka topic name.
topic_name: String,
/// Real kafka producer.
producer: Arc<ThreadedProducer>,
}

impl fmt::Debug for SingleProducer {
impl Producer {
fn new(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
Self {
last_report: Cell::new(Instant::now()),
topic_name,
producer,
}
}
}

impl fmt::Debug for Producer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Single")
f.debug_struct("Producer")
.field("last_report", &self.last_report)
.field("topic_name", &self.topic_name)
.field("producer", &"<ThreadedProducer>")
.finish()
Expand Down Expand Up @@ -177,44 +187,42 @@ impl KafkaClientBuilder {
pub fn add_kafka_topic_config(
mut self,
topic: KafkaTopic,
config: &KafkaConfig,
params: &KafkaParams,
) -> Result<Self, ClientError> {
let mut client_config = ClientConfig::new();
match config {
KafkaConfig::Single { params } => {
let KafkaParams {
topic_name,
config_name,
params,
} = params;

let config_name = config_name.map(str::to_string);

if let Some(producer) = self.reused_producers.get(&config_name) {
self.producers.insert(
topic,
Producer::single((*topic_name).to_string(), Arc::clone(producer)),
);
return Ok(self);
}

for config_p in *params {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}

let producer = Arc::new(
client_config
.create_with_context(Context)
.map_err(ClientError::InvalidConfig)?,
);

self.reused_producers
.insert(config_name, Arc::clone(&producer));
self.producers
.insert(topic, Producer::single((*topic_name).to_string(), producer));
Ok(self)
}
let KafkaParams {
topic_name,
config_name,
params,
} = params;

let config_name = config_name.map(str::to_string);

if let Some(producer) = self.reused_producers.get(&config_name) {
self.producers.insert(
topic,
Producer::new((*topic_name).to_string(), Arc::clone(producer)),
);
return Ok(self);
}

for config_p in *params {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}

let producer = Arc::new(
client_config
.create_with_context(Context)
.map_err(ClientError::InvalidConfig)?,
);

self.reused_producers
.insert(config_name, Arc::clone(&producer));
self.producers
.insert(topic, Producer::new((*topic_name).to_string(), producer));

Ok(self)
}

/// Consumes self and returns the built [`KafkaClient`].
Expand All @@ -236,30 +244,7 @@ impl fmt::Debug for KafkaClientBuilder {
}
}

/// This object contains the Kafka producer variants for single.
#[derive(Debug)]
enum ProducerInner {
/// Configuration variant for the single kafka producer.
Single(SingleProducer),
}

#[derive(Debug)]
struct Producer {
last_report: Cell<Instant>,
inner: ProducerInner,
}

impl Producer {
fn single(topic_name: String, producer: Arc<ThreadedProducer>) -> Self {
Self {
last_report: Instant::now().into(),
inner: ProducerInner::Single(SingleProducer {
topic_name,
producer,
}),
}
}

/// Sends the payload to the correct producer for the current topic.
fn send(
&self,
Expand All @@ -272,12 +257,8 @@ impl Producer {
histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64,
variant = variant
);
let (topic_name, producer) = match &self.inner {
ProducerInner::Single(SingleProducer {
topic_name,
producer,
}) => (topic_name.as_str(), producer.as_ref()),
};

let topic_name = self.topic_name.as_str();
let mut record = BaseRecord::to(topic_name).key(key).payload(payload);

// Make sure to set the headers if provided.
Expand All @@ -295,13 +276,13 @@ impl Producer {
if self.last_report.get().elapsed() > REPORT_FREQUENCY {
self.last_report.replace(Instant::now());
metric!(
gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64,
gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64,
variant = variant,
topic = topic_name
);
}

producer
self.producer
.send(record)
.map(|_| topic_name)
.map_err(|(error, _message)| {
Expand Down

0 comments on commit 51c35c7

Please sign in to comment.