diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 70d7bc1c3e..3b1b514400 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -16,7 +16,6 @@ use relay_kafka::{ }; use relay_metrics::aggregator::{AggregatorConfig, FlushBatching}; use relay_metrics::MetricNamespace; -use relay_redis::{RedisError, RedisPools}; use serde::de::{DeserializeOwned, Unexpected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; @@ -24,7 +23,7 @@ use uuid::Uuid; use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig}; use crate::byte_size::ByteSize; use crate::upstream::UpstreamDescriptor; -use crate::{create_redis_pools, RedisConfig, RedisConfigs}; +use crate::{create_redis_pools, RedisConfig, RedisConfigs, RedisPoolConfigs}; const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10; @@ -2272,16 +2271,12 @@ impl Config { &self.values.processing.topics.unused } - /// Redis servers to connect to, for rate limiting. - pub fn redis(&self) -> Result { - if !self.processing_enabled() { - return Ok(RedisPools::default()); - } + /// Redis servers to connect to for project configs, cardinality limits, + /// rate limiting, and metrics metadata. + pub fn redis(&self) -> Option { + let redis_configs = self.values.processing.redis.as_ref()?; - let Some(redis_configs) = self.values.processing.redis.as_ref() else { - return Ok(RedisPools::default()); - }; - create_redis_pools(redis_configs, self.cpu_concurrency()) + Some(create_redis_pools(redis_configs, self.cpu_concurrency())) } /// Chunk size of attachments in bytes. diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 02b546f819..98e46317a6 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -1,4 +1,4 @@ -use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; +use relay_redis::RedisConfigOptions; use serde::{Deserialize, Serialize}; /// For small setups, `2 x limits.max_thread_count` does not leave enough headroom. @@ -139,24 +139,33 @@ pub enum RedisConfigs { /// Individual configurations for each pool. Individual { /// Configuration for the `project_configs` pool. - #[serde(skip_serializing_if = "Option::is_none")] - project_configs: Option>, + project_configs: Box, /// Configuration for the `cardinality` pool. - #[serde(skip_serializing_if = "Option::is_none")] - cardinality: Option>, + cardinality: Box, /// Configuration for the `quotas` pool. - #[serde(skip_serializing_if = "Option::is_none")] - quotas: Option>, + quotas: Box, /// Configuration for the `misc` pool. - #[serde(skip_serializing_if = "Option::is_none")] - misc: Option>, + misc: Box, }, } -fn create_redis_pool( +/// Helper struct bundling connections and options for the various Redis pools. +#[derive(Clone, Debug)] +pub struct RedisPoolConfigs<'a> { + /// Configuration for the `project_configs` pool. + pub project_configs: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `cardinality` pool. + pub cardinality: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `quotas` pool. + pub quotas: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `misc` pool. + pub misc: (&'a RedisConnection, RedisConfigOptions), +} + +pub(super) fn create_redis_pool( config: &RedisConfig, cpu_concurrency: usize, -) -> Result { +) -> (&RedisConnection, RedisConfigOptions) { let options = RedisConfigOptions { max_connections: config .options @@ -170,27 +179,22 @@ fn create_redis_pool( write_timeout: config.options.write_timeout, }; - match &config.connection { - RedisConnection::Single(server) => RedisPool::single(server, options), - RedisConnection::Cluster(servers) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) - } - } + (&config.connection, options) } pub(super) fn create_redis_pools( configs: &RedisConfigs, cpu_concurrency: usize, -) -> Result { +) -> RedisPoolConfigs { match configs { RedisConfigs::Unified(cfg) => { - let pool = create_redis_pool(cfg, cpu_concurrency)?; - Ok(RedisPools { - project_configs: Some(pool.clone()), - cardinality: Some(pool.clone()), - quotas: Some(pool.clone()), - misc: Some(pool), - }) + let pool = create_redis_pool(cfg, cpu_concurrency); + RedisPoolConfigs { + project_configs: pool.clone(), + cardinality: pool.clone(), + quotas: pool.clone(), + misc: pool, + } } RedisConfigs::Individual { project_configs, @@ -198,29 +202,17 @@ pub(super) fn create_redis_pools( quotas, misc, } => { - let project_configs = project_configs - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - let cardinality = cardinality - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - let quotas = quotas - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - let misc = misc - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - - Ok(RedisPools { + let project_configs = create_redis_pool(project_configs, cpu_concurrency); + let cardinality = create_redis_pool(cardinality, cpu_concurrency); + let quotas = create_redis_pool(quotas, cpu_concurrency); + let misc = create_redis_pool(misc, cpu_concurrency); + + RedisPoolConfigs { project_configs, cardinality, quotas, misc, - }) + } } } } @@ -282,32 +274,41 @@ connection_timeout: 5 #[test] fn test_redis_individual() { let yaml = r#" -project_configs: +project_configs: server: "redis://127.0.0.1:6379" max_connections: 42 connection_timeout: 5 +cardinality: + server: "redis://127.0.0.1:6379" quotas: cluster_nodes: - "redis://127.0.0.1:6379" - "redis://127.0.0.2:6379" max_connections: 17 connection_timeout: 5 +misc: + cluster_nodes: + - "redis://127.0.0.1:6379" + - "redis://127.0.0.2:6379" "#; let configs: RedisConfigs = serde_yaml::from_str(yaml) .expect("Parsed processing redis configs: single with options"); let expected = RedisConfigs::Individual { - project_configs: Some(Box::new(RedisConfig { + project_configs: Box::new(RedisConfig { connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), options: PartialRedisConfigOptions { max_connections: Some(42), connection_timeout: 5, ..Default::default() }, - })), - cardinality: None, - quotas: Some(Box::new(RedisConfig { + }), + cardinality: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: Default::default(), + }), + quotas: Box::new(RedisConfig { connection: RedisConnection::Cluster(vec![ "redis://127.0.0.1:6379".to_owned(), "redis://127.0.0.2:6379".to_owned(), @@ -317,11 +318,17 @@ quotas: connection_timeout: 5, ..Default::default() }, - })), - misc: None, + }), + misc: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: Default::default(), + }), }; - assert_eq!(configs, expected,); + assert_eq!(configs, expected); } #[test] @@ -519,16 +526,19 @@ read_timeout: 10 #[test] fn test_redis_serialize_individual() { let configs = RedisConfigs::Individual { - project_configs: Some(Box::new(RedisConfig { + project_configs: Box::new(RedisConfig { connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), options: PartialRedisConfigOptions { max_connections: Some(42), connection_timeout: 5, ..Default::default() }, - })), - cardinality: None, - quotas: Some(Box::new(RedisConfig { + }), + cardinality: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: Default::default(), + }), + quotas: Box::new(RedisConfig { connection: RedisConnection::Cluster(vec![ "redis://127.0.0.1:6379".to_owned(), "redis://127.0.0.2:6379".to_owned(), @@ -538,8 +548,14 @@ read_timeout: 10 connection_timeout: 5, ..Default::default() }, - })), - misc: None, + }), + misc: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: Default::default(), + }), }; assert_json_snapshot!(configs, @r###" @@ -553,6 +569,14 @@ read_timeout: 10 "read_timeout": 3, "write_timeout": 3 }, + "cardinality": { + "server": "redis://127.0.0.1:6379", + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + }, "quotas": { "cluster_nodes": [ "redis://127.0.0.1:6379", @@ -564,6 +588,17 @@ read_timeout: 10 "idle_timeout": 60, "read_timeout": 3, "write_timeout": 3 + }, + "misc": { + "cluster_nodes": [ + "redis://127.0.0.1:6379", + "redis://127.0.0.2:6379" + ], + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 } } "###); diff --git a/relay-redis/src/noop.rs b/relay-redis/src/noop.rs index b3cf26011a..db08723155 100644 --- a/relay-redis/src/noop.rs +++ b/relay-redis/src/noop.rs @@ -31,14 +31,14 @@ impl RedisPool { } /// The various [`RedisPool`]s used within Relay. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct RedisPools { /// The pool used for project configurations - pub project_configs: Option, + pub project_configs: RedisPool, /// The pool used for cardinality limits. - pub cardinality: Option, + pub cardinality: RedisPool, /// The pool used for rate limiting/quotas. - pub quotas: Option, + pub quotas: RedisPool, /// The pool used for metrics metadata. - pub misc: Option, + pub misc: RedisPool, } diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index cd0ebab053..f5514131e7 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -209,16 +209,16 @@ impl RedisPool { } /// The various [`RedisPool`]s used within Relay. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct RedisPools { /// The pool used for project configurations - pub project_configs: Option, + pub project_configs: RedisPool, /// The pool used for cardinality limits. - pub cardinality: Option, + pub cardinality: RedisPool, /// The pool used for rate limiting/quotas. - pub quotas: Option, + pub quotas: RedisPool, /// The pool used for metrics metadata. - pub misc: Option, + pub misc: RedisPool, } /// Stats about how the [`RedisPool`] is performing. diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 98e1791740..2ead0b8f0e 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -10,7 +10,8 @@ use axum::extract::FromRequestParts; use axum::http::request::Parts; use rayon::ThreadPool; use relay_cogs::Cogs; -use relay_config::Config; +use relay_config::{Config, RedisConnection, RedisPoolConfigs}; +use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; @@ -133,7 +134,12 @@ impl ServiceState { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); - let redis_pools = config.redis().context(ServiceError::Redis)?; + let redis_pools = config + .redis() + .filter(|_| config.processing_enabled()) + .map(create_redis_pools) + .transpose() + .context(ServiceError::Redis)?; let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size())); @@ -227,7 +233,9 @@ impl ServiceState { buffer_guard.clone(), project_cache_services, metric_outcomes, - redis_pools.project_configs.clone(), + redis_pools + .as_ref() + .map(|pools| pools.project_configs.clone()), ) .spawn_handler(project_cache_rx); @@ -332,6 +340,32 @@ impl ServiceState { } } +fn create_redis_pool( + connection: &RedisConnection, + options: RedisConfigOptions, +) -> Result { + match connection { + RedisConnection::Cluster(servers) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) + } + RedisConnection::Single(server) => RedisPool::single(server, options), + } +} + +pub fn create_redis_pools(configs: RedisPoolConfigs) -> Result { + let project_configs = create_redis_pool(configs.project_configs.0, configs.project_configs.1)?; + let cardinality = create_redis_pool(configs.cardinality.0, configs.cardinality.1)?; + let quotas = create_redis_pool(configs.quotas.0, configs.quotas.1)?; + let misc = create_redis_pool(configs.misc.0, configs.misc.1)?; + + Ok(RedisPools { + project_configs, + cardinality, + quotas, + misc, + }) +} + #[axum::async_trait] impl FromRequestParts for ServiceState { type Rejection = Infallible; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index a82ab0699e..ddb4701f26 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1137,7 +1137,7 @@ impl EnvelopeProcessorService { config: Arc, global_config: GlobalConfigHandle, cogs: Cogs, - #[cfg(feature = "processing")] redis: RedisPools, + #[cfg(feature = "processing")] redis: Option, addrs: Addrs, metric_outcomes: MetricOutcomes, ) -> Self { @@ -1151,35 +1151,41 @@ impl EnvelopeProcessorService { } }); + #[cfg(feature = "processing")] + let (cardinality, quotas, misc) = match redis { + Some(RedisPools { + cardinality, + quotas, + misc, + .. + }) => (Some(cardinality), Some(quotas), Some(misc)), + None => (None, None, None), + }; + let inner = InnerProcessor { pool, global_config, cogs, #[cfg(feature = "processing")] // TODO: Tentatively using `quotas` for this - redis_pool: redis.quotas.clone(), + redis_pool: quotas.clone(), #[cfg(feature = "processing")] - rate_limiter: redis - .quotas - .clone() - .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), + rate_limiter: quotas.map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())), addrs, geoip_lookup, #[cfg(feature = "processing")] - metric_meta_store: redis.misc.clone().map(|pool| { - RedisMetricMetaStore::new(pool, config.metrics_meta_locations_expiry()) + metric_meta_store: misc.map(|misc| { + RedisMetricMetaStore::new(misc, config.metrics_meta_locations_expiry()) }), #[cfg(feature = "processing")] - cardinality_limiter: redis - .cardinality - .clone() - .map(|pool| { + cardinality_limiter: cardinality + .map(|cardinality| { RedisSetLimiter::new( RedisSetLimiterOptions { cache_vacuum_interval: config .cardinality_limiter_cache_vacuum_interval(), }, - pool, + cardinality, ) }) .map(CardinalityLimiter::new), diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index cb2460a7b7..8170a03c97 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -17,14 +17,14 @@ pub struct RelayStats { config: Arc, upstream_relay: Addr, #[cfg(feature = "processing")] - redis_pools: RedisPools, + redis_pools: Option, } impl RelayStats { pub fn new( config: Arc, upstream_relay: Addr, - #[cfg(feature = "processing")] redis_pools: RedisPools, + #[cfg(feature = "processing")] redis_pools: Option, ) -> Self { Self { config, @@ -118,19 +118,16 @@ impl RelayStats { #[cfg(feature = "processing")] async fn redis_pools(&self) { - if let Some(ref project_configs) = self.redis_pools.project_configs { + if let Some(RedisPools { + project_configs, + cardinality, + quotas, + misc, + }) = self.redis_pools.as_ref() + { Self::redis_pool(project_configs, "project_configs").await; - } - - if let Some(ref cardinality) = self.redis_pools.cardinality { Self::redis_pool(cardinality, "cardinality").await; - } - - if let Some(ref quotas) = self.redis_pools.quotas { Self::redis_pool(quotas, "quotas").await; - } - - if let Some(ref misc) = self.redis_pools.misc { Self::redis_pool(misc, "misc").await; } } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 1d034f0723..2f515f638f 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -15,6 +15,7 @@ use relay_test::mock_service; use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; +use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; @@ -120,7 +121,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] - let redis_pools = config.redis().unwrap(); + let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); @@ -149,7 +150,7 @@ pub fn create_test_processor_with_addrs( addrs: processor::Addrs, ) -> EnvelopeProcessorService { #[cfg(feature = "processing")] - let redis_pools = config.redis().unwrap(); + let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, addrs.outcome_aggregator.clone());