diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 521a560986..c13b019992 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -11,7 +11,7 @@ use anyhow::Context; use relay_auth::{generate_key_pair, generate_relay_id, PublicKey, RelayId, SecretKey}; use relay_common::Dsn; use relay_kafka::{ - ConfigError as KafkaConfigError, KafkaConfig, KafkaConfigParam, KafkaTopic, TopicAssignment, + ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment, TopicAssignments, }; use relay_metrics::aggregator::{AggregatorConfig, ShiftKey}; @@ -2236,7 +2236,7 @@ impl Config { } /// Configuration name and list of Kafka configuration parameters for a given topic. - pub fn kafka_config(&self, topic: KafkaTopic) -> Result { + pub fn kafka_config(&self, topic: KafkaTopic) -> Result { self.values.processing.topics.get(topic).kafka_config( &self.values.processing.kafka_config, &self.values.processing.secondary_kafka_configs, diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index f754ee699e..2958d161bf 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -171,18 +171,7 @@ pub struct KafkaTopicConfig { kafka_config_name: String, } -/// Describes Kafka config, with all the parameters extracted, which will be used for creating the -/// kafka producer. -#[derive(Debug)] -pub enum KafkaConfig<'a> { - /// Single config with Kafka parameters. - Single { - /// Kafka parameters to create the kafka producer. - params: KafkaParams<'a>, - }, -} - -/// Sharded Kafka config. +/// Config for creating a Kafka producer. #[derive(Debug)] pub struct KafkaParams<'a> { /// The topic name to use. @@ -208,26 +197,22 @@ impl TopicAssignment { &'a self, default_config: &'a Vec, secondary_configs: &'a BTreeMap>, - ) -> Result, ConfigError> { + ) -> Result, ConfigError> { let kafka_config = match self { - Self::Primary(topic_name) => KafkaConfig::Single { - params: KafkaParams { - topic_name, - config_name: None, - params: default_config.as_slice(), - }, + Self::Primary(topic_name) => KafkaParams { + topic_name, + config_name: None, + params: default_config.as_slice(), }, Self::Secondary(KafkaTopicConfig { topic_name, kafka_config_name, - }) => KafkaConfig::Single { - params: KafkaParams { - config_name: Some(kafka_config_name), - topic_name, - params: secondary_configs - .get(kafka_config_name) - .ok_or(ConfigError::UnknownKafkaConfigName)?, - }, + }) => KafkaParams { + config_name: Some(kafka_config_name), + topic_name, + params: secondary_configs + .get(kafka_config_name) + .ok_or(ConfigError::UnknownKafkaConfigName)?, }, }; @@ -289,11 +274,9 @@ transactions: "ingest-transactions-kafka-topic" .expect("Kafka config for events topic"); assert!(matches!( events_config, - KafkaConfig::Single { - params: KafkaParams { - topic_name: "ingest-events-kafka-topic", - .. - } + KafkaParams { + topic_name: "ingest-events-kafka-topic", + .. } )); @@ -302,12 +285,10 @@ transactions: "ingest-transactions-kafka-topic" .expect("Kafka config for profiles topic"); assert!(matches!( events_config, - KafkaConfig::Single { - params: KafkaParams { - topic_name: "ingest-profiles", - config_name: Some("profiles"), - .. - } + KafkaParams { + topic_name: "ingest-profiles", + config_name: Some("profiles"), + .. } )); @@ -316,11 +297,9 @@ transactions: "ingest-transactions-kafka-topic" .expect("Kafka config for metrics topic"); assert!(matches!( events_config, - KafkaConfig::Single { - params: KafkaParams { - topic_name: "ingest-metrics-3", - .. - } + KafkaParams { + topic_name: "ingest-metrics-3", + .. } )); @@ -330,11 +309,9 @@ transactions: "ingest-transactions-kafka-topic" .expect("Kafka config for transactions topic"); assert!(matches!( transactions_config, - KafkaConfig::Single { - params: KafkaParams { - topic_name: "ingest-transactions-kafka-topic", - .. - } + KafkaParams { + topic_name: "ingest-transactions-kafka-topic", + .. } )); } diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index 97f5db2650..3cdb871769 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -1,7 +1,4 @@ //! This module contains the kafka producer related code. -//! -//! There are two different producers that are supported in Relay right now: -//! - [`SingleProducer`] - which sends all the messages to the defined kafka [`KafkaTopic`], use std::borrow::Cow; use std::cell::Cell; @@ -16,7 +13,7 @@ use rdkafka::ClientConfig; use relay_statsd::metric; use thiserror::Error; -use crate::config::{KafkaConfig, KafkaParams, KafkaTopic}; +use crate::config::{KafkaParams, KafkaTopic}; use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms}; mod utils; @@ -80,16 +77,29 @@ pub trait Message { } /// Single kafka producer config with assigned topic. -struct SingleProducer { +struct Producer { + /// Time of the latest collection of stats. + last_report: Cell, /// Kafka topic name. topic_name: String, /// Real kafka producer. producer: Arc, } -impl fmt::Debug for SingleProducer { +impl Producer { + fn new(topic_name: String, producer: Arc) -> Self { + Self { + last_report: Cell::new(Instant::now()), + topic_name, + producer, + } + } +} + +impl fmt::Debug for Producer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Single") + f.debug_struct("Producer") + .field("last_report", &self.last_report) .field("topic_name", &self.topic_name) .field("producer", &"") .finish() @@ -177,44 +187,42 @@ impl KafkaClientBuilder { pub fn add_kafka_topic_config( mut self, topic: KafkaTopic, - config: &KafkaConfig, + params: &KafkaParams, ) -> Result { let mut client_config = ClientConfig::new(); - match config { - KafkaConfig::Single { params } => { - let KafkaParams { - topic_name, - config_name, - params, - } = params; - - let config_name = config_name.map(str::to_string); - - if let Some(producer) = self.reused_producers.get(&config_name) { - self.producers.insert( - topic, - Producer::single((*topic_name).to_string(), Arc::clone(producer)), - ); - return Ok(self); - } - - for config_p in *params { - client_config.set(config_p.name.as_str(), config_p.value.as_str()); - } - - let producer = Arc::new( - client_config - .create_with_context(Context) - .map_err(ClientError::InvalidConfig)?, - ); - self.reused_producers - .insert(config_name, Arc::clone(&producer)); - self.producers - .insert(topic, Producer::single((*topic_name).to_string(), producer)); - Ok(self) - } + let KafkaParams { + topic_name, + config_name, + params, + } = params; + + let config_name = config_name.map(str::to_string); + + if let Some(producer) = self.reused_producers.get(&config_name) { + self.producers.insert( + topic, + Producer::new((*topic_name).to_string(), Arc::clone(producer)), + ); + return Ok(self); + } + + for config_p in *params { + client_config.set(config_p.name.as_str(), config_p.value.as_str()); } + + let producer = Arc::new( + client_config + .create_with_context(Context) + .map_err(ClientError::InvalidConfig)?, + ); + + self.reused_producers + .insert(config_name, Arc::clone(&producer)); + self.producers + .insert(topic, Producer::new((*topic_name).to_string(), producer)); + + Ok(self) } /// Consumes self and returns the built [`KafkaClient`]. @@ -236,30 +244,7 @@ impl fmt::Debug for KafkaClientBuilder { } } -/// This object contains the Kafka producer variants for single. -#[derive(Debug)] -enum ProducerInner { - /// Configuration variant for the single kafka producer. - Single(SingleProducer), -} - -#[derive(Debug)] -struct Producer { - last_report: Cell, - inner: ProducerInner, -} - impl Producer { - fn single(topic_name: String, producer: Arc) -> Self { - Self { - last_report: Instant::now().into(), - inner: ProducerInner::Single(SingleProducer { - topic_name, - producer, - }), - } - } - /// Sends the payload to the correct producer for the current topic. fn send( &self, @@ -272,12 +257,8 @@ impl Producer { histogram(KafkaHistograms::KafkaMessageSize) = payload.len() as u64, variant = variant ); - let (topic_name, producer) = match &self.inner { - ProducerInner::Single(SingleProducer { - topic_name, - producer, - }) => (topic_name.as_str(), producer.as_ref()), - }; + + let topic_name = self.topic_name.as_str(); let mut record = BaseRecord::to(topic_name).key(key).payload(payload); // Make sure to set the headers if provided. @@ -295,13 +276,13 @@ impl Producer { if self.last_report.get().elapsed() > REPORT_FREQUENCY { self.last_report.replace(Instant::now()); metric!( - gauge(KafkaGauges::InFlightCount) = producer.in_flight_count() as u64, + gauge(KafkaGauges::InFlightCount) = self.producer.in_flight_count() as u64, variant = variant, topic = topic_name ); } - producer + self.producer .send(record) .map(|_| topic_name) .map_err(|(error, _message)| {