Skip to content

Commit

Permalink
Make individual pools non-optional
Browse files Browse the repository at this point in the history
  • Loading branch information
loewenheim committed Jul 22, 2024
1 parent 713e1bf commit ac34785
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 109 deletions.
17 changes: 6 additions & 11 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ use relay_kafka::{
};
use relay_metrics::aggregator::{AggregatorConfig, FlushBatching};
use relay_metrics::MetricNamespace;
use relay_redis::{RedisError, RedisPools};
use serde::de::{DeserializeOwned, Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use uuid::Uuid;

use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig};
use crate::byte_size::ByteSize;
use crate::upstream::UpstreamDescriptor;
use crate::{create_redis_pools, RedisConfig, RedisConfigs};
use crate::{create_redis_pools, RedisConfig, RedisConfigs, RedisPoolConfigs};

const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10;

Expand Down Expand Up @@ -2272,16 +2271,12 @@ impl Config {
&self.values.processing.topics.unused
}

/// Redis servers to connect to, for rate limiting.
pub fn redis(&self) -> Result<RedisPools, RedisError> {
if !self.processing_enabled() {
return Ok(RedisPools::default());
}
/// Redis servers to connect to for project configs, cardinality limits,
/// rate limiting, and metrics metadata.
pub fn redis(&self) -> Option<RedisPoolConfigs> {
let redis_configs = self.values.processing.redis.as_ref()?;

let Some(redis_configs) = self.values.processing.redis.as_ref() else {
return Ok(RedisPools::default());
};
create_redis_pools(redis_configs, self.cpu_concurrency())
Some(create_redis_pools(redis_configs, self.cpu_concurrency()))
}

/// Chunk size of attachments in bytes.
Expand Down
151 changes: 93 additions & 58 deletions relay-config/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_redis::RedisConfigOptions;
use serde::{Deserialize, Serialize};

/// For small setups, `2 x limits.max_thread_count` does not leave enough headroom.
Expand Down Expand Up @@ -139,24 +139,33 @@ pub enum RedisConfigs {
/// Individual configurations for each pool.
Individual {
/// Configuration for the `project_configs` pool.
#[serde(skip_serializing_if = "Option::is_none")]
project_configs: Option<Box<RedisConfig>>,
project_configs: Box<RedisConfig>,
/// Configuration for the `cardinality` pool.
#[serde(skip_serializing_if = "Option::is_none")]
cardinality: Option<Box<RedisConfig>>,
cardinality: Box<RedisConfig>,
/// Configuration for the `quotas` pool.
#[serde(skip_serializing_if = "Option::is_none")]
quotas: Option<Box<RedisConfig>>,
quotas: Box<RedisConfig>,
/// Configuration for the `misc` pool.
#[serde(skip_serializing_if = "Option::is_none")]
misc: Option<Box<RedisConfig>>,
misc: Box<RedisConfig>,
},
}

fn create_redis_pool(
/// Helper struct bundling connections and options for the various Redis pools.
#[derive(Clone, Debug)]
pub struct RedisPoolConfigs<'a> {
/// Configuration for the `project_configs` pool.
pub project_configs: (&'a RedisConnection, RedisConfigOptions),
/// Configuration for the `cardinality` pool.
pub cardinality: (&'a RedisConnection, RedisConfigOptions),
/// Configuration for the `quotas` pool.
pub quotas: (&'a RedisConnection, RedisConfigOptions),
/// Configuration for the `misc` pool.
pub misc: (&'a RedisConnection, RedisConfigOptions),
}

pub(super) fn create_redis_pool(
config: &RedisConfig,
cpu_concurrency: usize,
) -> Result<RedisPool, RedisError> {
) -> (&RedisConnection, RedisConfigOptions) {
let options = RedisConfigOptions {
max_connections: config
.options
Expand All @@ -170,57 +179,40 @@ fn create_redis_pool(
write_timeout: config.options.write_timeout,
};

match &config.connection {
RedisConnection::Single(server) => RedisPool::single(server, options),
RedisConnection::Cluster(servers) => {
RedisPool::cluster(servers.iter().map(|s| s.as_str()), options)
}
}
(&config.connection, options)
}

pub(super) fn create_redis_pools(
configs: &RedisConfigs,
cpu_concurrency: usize,
) -> Result<RedisPools, RedisError> {
) -> RedisPoolConfigs {
match configs {
RedisConfigs::Unified(cfg) => {
let pool = create_redis_pool(cfg, cpu_concurrency)?;
Ok(RedisPools {
project_configs: Some(pool.clone()),
cardinality: Some(pool.clone()),
quotas: Some(pool.clone()),
misc: Some(pool),
})
let pool = create_redis_pool(cfg, cpu_concurrency);
RedisPoolConfigs {
project_configs: pool.clone(),
cardinality: pool.clone(),
quotas: pool.clone(),
misc: pool,
}
}
RedisConfigs::Individual {
project_configs,
cardinality,
quotas,
misc,
} => {
let project_configs = project_configs
.as_ref()
.map(|cfg| create_redis_pool(cfg, cpu_concurrency))
.transpose()?;
let cardinality = cardinality
.as_ref()
.map(|cfg| create_redis_pool(cfg, cpu_concurrency))
.transpose()?;
let quotas = quotas
.as_ref()
.map(|cfg| create_redis_pool(cfg, cpu_concurrency))
.transpose()?;
let misc = misc
.as_ref()
.map(|cfg| create_redis_pool(cfg, cpu_concurrency))
.transpose()?;

Ok(RedisPools {
let project_configs = create_redis_pool(project_configs, cpu_concurrency);
let cardinality = create_redis_pool(cardinality, cpu_concurrency);
let quotas = create_redis_pool(quotas, cpu_concurrency);
let misc = create_redis_pool(misc, cpu_concurrency);

RedisPoolConfigs {
project_configs,
cardinality,
quotas,
misc,
})
}
}
}
}
Expand Down Expand Up @@ -282,32 +274,41 @@ connection_timeout: 5
#[test]
fn test_redis_individual() {
let yaml = r#"
project_configs:
project_configs:
server: "redis://127.0.0.1:6379"
max_connections: 42
connection_timeout: 5
cardinality:
server: "redis://127.0.0.1:6379"
quotas:
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
max_connections: 17
connection_timeout: 5
misc:
cluster_nodes:
- "redis://127.0.0.1:6379"
- "redis://127.0.0.2:6379"
"#;

let configs: RedisConfigs = serde_yaml::from_str(yaml)
.expect("Parsed processing redis configs: single with options");

let expected = RedisConfigs::Individual {
project_configs: Some(Box::new(RedisConfig {
project_configs: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
})),
cardinality: None,
quotas: Some(Box::new(RedisConfig {
}),
cardinality: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: Default::default(),
}),
quotas: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
Expand All @@ -317,11 +318,17 @@ quotas:
connection_timeout: 5,
..Default::default()
},
})),
misc: None,
}),
misc: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: Default::default(),
}),
};

assert_eq!(configs, expected,);
assert_eq!(configs, expected);
}

#[test]
Expand Down Expand Up @@ -519,16 +526,19 @@ read_timeout: 10
#[test]
fn test_redis_serialize_individual() {
let configs = RedisConfigs::Individual {
project_configs: Some(Box::new(RedisConfig {
project_configs: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: PartialRedisConfigOptions {
max_connections: Some(42),
connection_timeout: 5,
..Default::default()
},
})),
cardinality: None,
quotas: Some(Box::new(RedisConfig {
}),
cardinality: Box::new(RedisConfig {
connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()),
options: Default::default(),
}),
quotas: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
Expand All @@ -538,8 +548,14 @@ read_timeout: 10
connection_timeout: 5,
..Default::default()
},
})),
misc: None,
}),
misc: Box::new(RedisConfig {
connection: RedisConnection::Cluster(vec![
"redis://127.0.0.1:6379".to_owned(),
"redis://127.0.0.2:6379".to_owned(),
]),
options: Default::default(),
}),
};

assert_json_snapshot!(configs, @r###"
Expand All @@ -553,6 +569,14 @@ read_timeout: 10
"read_timeout": 3,
"write_timeout": 3
},
"cardinality": {
"server": "redis://127.0.0.1:6379",
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
"quotas": {
"cluster_nodes": [
"redis://127.0.0.1:6379",
Expand All @@ -564,6 +588,17 @@ read_timeout: 10
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
},
"misc": {
"cluster_nodes": [
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379"
],
"connection_timeout": 5,
"max_lifetime": 300,
"idle_timeout": 60,
"read_timeout": 3,
"write_timeout": 3
}
}
"###);
Expand Down
10 changes: 5 additions & 5 deletions relay-redis/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ impl RedisPool {
}

/// The various [`RedisPool`]s used within Relay.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct RedisPools {
/// The pool used for project configurations
pub project_configs: Option<RedisPool>,
pub project_configs: RedisPool,
/// The pool used for cardinality limits.
pub cardinality: Option<RedisPool>,
pub cardinality: RedisPool,
/// The pool used for rate limiting/quotas.
pub quotas: Option<RedisPool>,
pub quotas: RedisPool,
/// The pool used for metrics metadata.
pub misc: Option<RedisPool>,
pub misc: RedisPool,
}
10 changes: 5 additions & 5 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,16 @@ impl RedisPool {
}

/// The various [`RedisPool`]s used within Relay.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct RedisPools {
/// The pool used for project configurations
pub project_configs: Option<RedisPool>,
pub project_configs: RedisPool,
/// The pool used for cardinality limits.
pub cardinality: Option<RedisPool>,
pub cardinality: RedisPool,
/// The pool used for rate limiting/quotas.
pub quotas: Option<RedisPool>,
pub quotas: RedisPool,
/// The pool used for metrics metadata.
pub misc: Option<RedisPool>,
pub misc: RedisPool,
}

/// Stats about how the [`RedisPool`] is performing.
Expand Down
Loading

0 comments on commit ac34785

Please sign in to comment.