Skip to content

Commit

Permalink
ref(kafka): Remove ShardedProducer (#3415)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Apr 15, 2024
1 parent b49bfa4 commit f02d3b9
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 352 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Features**:

- **Breaking change:** Kafka topic configuration keys now support the default topic name. The previous aliases `metrics` and `metrics_transactions` are no longer supported if configuring topics manually. Use `ingest-metrics` or `metrics_sessions` instead of `metrics`, and `ingest-performance-metrics` or `metrics_generic` instead of `metrics_transactions`. ([#3361](https://github.com/getsentry/relay/pull/3361))
- **Breaking change:** Remove `ShardedProducer` and related code. The sharded configuration for Kafka is no longer supported. ([#3415](https://github.com/getsentry/relay/pull/3415))
- Add support for continuous profiling. ([#3270](https://github.com/getsentry/relay/pull/3270))
- Add support for Reporting API for CSP reports ([#3277](https://github.com/getsentry/relay/pull/3277))
- Extract op and description while converting opentelemetry spans to sentry spans. ([#3287](https://github.com/getsentry/relay/pull/3287))
Expand Down
146 changes: 32 additions & 114 deletions relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
//! The configuration can be either;
//! - [`TopicAssignment::Primary`] - the main and default kafka configuration,
//! - [`TopicAssignment::Secondary`] - used to configure any additional kafka topic,
//! - [`TopicAssignment::Sharded`] - if we want to configure multiple kafka clusters,
//! we can create a mapping of the range of logical shards to the kafka configuration.

use std::collections::BTreeMap;

Expand Down Expand Up @@ -160,9 +158,6 @@ pub enum TopicAssignment {
/// `secondary_kafka_configs`. In this case that custom kafka config will be used to produce
/// data to the given topic name.
Secondary(KafkaTopicConfig),
/// If we want to configure multiple kafka clusters, we can create a mapping of the
/// range of logical shards to the kafka configuration.
Sharded(Sharded),
}

/// Configuration for topic
Expand All @@ -176,37 +171,6 @@ pub struct KafkaTopicConfig {
kafka_config_name: String,
}

/// Configuration for logical shards -> kafka configuration mapping.
///
/// The configuration for this should look like:
///
/// ```ignore
/// metrics:
/// shards: 65000
/// mapping:
/// 0:
/// name: "ingest-metrics-1"
/// config: "metrics_1"
/// 25000:
/// name: "ingest-metrics-2"
/// config: "metrics_2"
/// 45000:
/// name: "ingest-metrics-3"
/// config: "metrics_3"
/// ```
///
/// where the `shards` defines how many logical shards must be created, and `mapping`
/// describes the per-shard configuration. Index in the `mapping` is the initial inclusive
/// index of the shard and the range is last till the next index or the maximum shard defined in
/// the `shards` option. The first index must always start with 0.
#[derive(Serialize, Deserialize, Debug)]
pub struct Sharded {
/// The number of shards used for this topic.
shards: u64,
/// The Kafka configuration assigned to the specific shard range.
mapping: BTreeMap<u64, KafkaTopicConfig>,
}

/// Describes Kafka config, with all the parameters extracted, which will be used for creating the
/// kafka producer.
#[derive(Debug)]
Expand All @@ -216,14 +180,6 @@ pub enum KafkaConfig<'a> {
/// Kafka parameters to create the kafka producer.
params: KafkaParams<'a>,
},

/// The list of the Kafka configs with related shard configs.
Sharded {
/// The maximum number of logical shards for this set of configs.
shards: u64,
/// The list of the sharded Kafka configs.
configs: BTreeMap<u64, KafkaParams<'a>>,
},
}

/// Sharded Kafka config.
Expand Down Expand Up @@ -273,27 +229,6 @@ impl TopicAssignment {
.ok_or(ConfigError::UnknownKafkaConfigName)?,
},
},
Self::Sharded(Sharded { shards, mapping }) => {
// quick fail if the config does not contain shard 0
if !mapping.contains_key(&0) {
return Err(ConfigError::InvalidShard);
}
let mut kafka_params = BTreeMap::new();
for (shard, kafka_config) in mapping {
let config = KafkaParams {
topic_name: kafka_config.topic_name.as_str(),
config_name: Some(kafka_config.kafka_config_name.as_str()),
params: secondary_configs
.get(kafka_config.kafka_config_name.as_str())
.ok_or(ConfigError::UnknownKafkaConfigName)?,
};
kafka_params.insert(*shard, config);
}
KafkaConfig::Sharded {
shards: *shards,
configs: kafka_params,
}
}
};

Ok(kafka_config)
Expand Down Expand Up @@ -321,18 +256,7 @@ ingest-events: "ingest-events-kafka-topic"
profiles:
name: "ingest-profiles"
config: "profiles"
ingest-metrics:
shards: 65000
mapping:
0:
name: "ingest-metrics-1"
config: "metrics_1"
25000:
name: "ingest-metrics-2"
config: "metrics_2"
45000:
name: "ingest-metrics-3"
config: "metrics_3"
ingest-metrics: "ingest-metrics-3"
transactions: "ingest-transactions-kafka-topic"
"#;

Expand All @@ -348,41 +272,17 @@ transactions: "ingest-transactions-kafka-topic"
value: "test-value".to_string(),
}],
);
second_config.insert(
"metrics_1".to_string(),
vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}],
);
second_config.insert(
"metrics_2".to_string(),
vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}],
);
second_config.insert(
"metrics_3".to_string(),
vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}],
);

let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
let events = topics.events;
let profiles = topics.profiles;
let metrics = topics.metrics_sessions;
let metrics_sessions = topics.metrics_sessions;
let transactions = topics.transactions;

assert!(matches!(events, TopicAssignment::Primary(_)));
assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
assert!(matches!(metrics, TopicAssignment::Sharded { .. }));

let events_config = metrics
.kafka_config(&def_config, &second_config)
.expect("Kafka config for metrics topic");
assert!(matches!(events_config, KafkaConfig::Sharded { .. }));
assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
assert!(matches!(transactions, TopicAssignment::Primary(_)));

let events_config = events
.kafka_config(&def_config, &second_config)
Expand All @@ -397,6 +297,33 @@ transactions: "ingest-transactions-kafka-topic"
}
));

let events_config = profiles
.kafka_config(&def_config, &second_config)
.expect("Kafka config for profiles topic");
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-profiles",
config_name: Some("profiles"),
..
}
}
));

let events_config = metrics_sessions
.kafka_config(&def_config, &second_config)
.expect("Kafka config for metrics topic");
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-metrics-3",
..
}
}
));

// Legacy keys are still supported
let transactions_config = transactions
.kafka_config(&def_config, &second_config)
Expand All @@ -410,15 +337,6 @@ transactions: "ingest-transactions-kafka-topic"
}
}
));

let (shards, mapping) =
if let TopicAssignment::Sharded(Sharded { shards, mapping }) = metrics {
(shards, mapping)
} else {
unreachable!()
};
assert_eq!(shards, 65000);
assert_eq!(3, mapping.len());
}

#[test]
Expand Down
99 changes: 2 additions & 97 deletions relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
//!
//! There are two different producers that are supported in Relay right now:
//! - [`SingleProducer`] - which sends all the messages to the defined kafka [`KafkaTopic`],
//! - [`ShardedProducer`] - which expects to have at least one shard configured, and depending on
//! the shard number the different messages will be sent to different topics using the configured
//! producer for the this exact shard.

use std::borrow::Cow;
use std::cell::Cell;
Expand Down Expand Up @@ -99,54 +96,6 @@ impl fmt::Debug for SingleProducer {
}
}

/// Sharded producer configuration.
struct ShardedProducer {
/// The maximum number of shards for this producer.
shards: u64,
/// The actual Kafka producer assigned to the range of logical shards, where the `u64` in the map is
/// the inclusive beginning of the range.
producers: BTreeMap<u64, (String, Arc<ThreadedProducer>)>,
}

impl ShardedProducer {
/// Returns topic name and the Kafka producer based on the provided sharding key.
/// Returns error [`ClientError::InvalidShard`] if the shard range for the provided sharding
/// key could not be found.
///
/// # Errors
/// Returns [`ClientError::InvalidShard`] error if the provided `sharding_key` could not be
/// placed in any configured shard ranges.
pub fn get_producer(
&self,
sharding_key: u64,
) -> Result<(&str, &ThreadedProducer), ClientError> {
let shard = sharding_key % self.shards;
let (topic_name, producer) = self
.producers
.iter()
.take_while(|(k, _)| *k <= &shard)
.last()
.map(|(_, v)| v)
.ok_or(ClientError::InvalidShard)?;

Ok((topic_name, producer))
}
}

impl fmt::Debug for ShardedProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let producers = &self
.producers
.iter()
.map(|(shard, (topic, _))| (shard, topic))
.collect::<BTreeMap<_, _>>();
f.debug_struct("ShardedProducer")
.field("shards", &self.shards)
.field("producers", producers)
.finish()
}
}

/// Keeps all the configured kafka producers and responsible for the routing of the messages.
#[derive(Debug)]
pub struct KafkaClient {
Expand All @@ -167,7 +116,6 @@ impl KafkaClient {
pub fn send_message(
&self,
topic: KafkaTopic,
organization_id: u64,
message: &impl Message,
) -> Result<&str, ClientError> {
let serialized = message.serialize()?;
Expand All @@ -179,7 +127,6 @@ impl KafkaClient {
let key = message.key();
self.send(
topic,
organization_id,
&key,
message.headers(),
message.variant(),
Expand All @@ -193,7 +140,6 @@ impl KafkaClient {
pub fn send(
&self,
topic: KafkaTopic,
organization_id: u64,
key: &[u8; 16],
headers: Option<&BTreeMap<String, String>>,
variant: &str,
Expand All @@ -205,7 +151,7 @@ impl KafkaClient {
);
ClientError::InvalidTopicName
})?;
producer.send(organization_id, key, headers, variant, payload)
producer.send(key, headers, variant, payload)
}
}

Expand Down Expand Up @@ -268,34 +214,6 @@ impl KafkaClientBuilder {
.insert(topic, Producer::single((*topic_name).to_string(), producer));
Ok(self)
}
KafkaConfig::Sharded { shards, configs } => {
let mut producers = BTreeMap::new();
for (shard, kafka_params) in configs {
let config_name = kafka_params.config_name.map(str::to_string);
if let Some(producer) = self.reused_producers.get(&config_name) {
let cached_producer = Arc::clone(producer);
producers.insert(
*shard,
(kafka_params.topic_name.to_string(), cached_producer),
);
continue;
}
for config_p in kafka_params.params {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
let producer = Arc::new(
client_config
.create_with_context(Context)
.map_err(ClientError::InvalidConfig)?,
);
self.reused_producers
.insert(config_name, Arc::clone(&producer));
producers.insert(*shard, (kafka_params.topic_name.to_string(), producer));
}
self.producers
.insert(topic, Producer::sharded(*shards, producers));
Ok(self)
}
}
}

Expand All @@ -318,14 +236,11 @@ impl fmt::Debug for KafkaClientBuilder {
}
}

/// This object contains the Kafka producer variants for single and sharded configurations.
/// This object contains the Kafka producer variants for single.
#[derive(Debug)]
enum ProducerInner {
/// Configuration variant for the single kafka producer.
Single(SingleProducer),
/// Configuration variant for sharded kafka producer, when one topic has different producers
/// dedicated to the range of the shards.
Sharded(ShardedProducer),
}

#[derive(Debug)]
Expand All @@ -345,17 +260,9 @@ impl Producer {
}
}

fn sharded(shards: u64, producers: BTreeMap<u64, (String, Arc<ThreadedProducer>)>) -> Self {
Self {
last_report: Instant::now().into(),
inner: ProducerInner::Sharded(ShardedProducer { shards, producers }),
}
}

/// Sends the payload to the correct producer for the current topic.
fn send(
&self,
organization_id: u64,
key: &[u8; 16],
headers: Option<&BTreeMap<String, String>>,
variant: &str,
Expand All @@ -370,8 +277,6 @@ impl Producer {
topic_name,
producer,
}) => (topic_name.as_str(), producer.as_ref()),

ProducerInner::Sharded(sharded) => sharded.get_producer(organization_id)?,
};
let mut record = BaseRecord::to(topic_name).key(key).payload(payload);

Expand Down
Loading

0 comments on commit f02d3b9

Please sign in to comment.