From db09075154d94460c945db15c22834c4c4327aa0 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 19 Jul 2024 15:22:01 +0200 Subject: [PATCH 01/21] WIP --- relay-config/src/config.rs | 29 ++++-------- relay-config/src/redis.rs | 97 +++++++++++++++++++++++++++++++++++--- 2 files changed, 98 insertions(+), 28 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index b7f42abeab..7dbb18a93d 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -16,7 +16,6 @@ use relay_kafka::{ }; use relay_metrics::aggregator::{AggregatorConfig, FlushBatching}; use relay_metrics::MetricNamespace; -use relay_redis::RedisConfigOptions; use serde::de::{DeserializeOwned, Unexpected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; @@ -24,7 +23,7 @@ use uuid::Uuid; use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig}; use crate::byte_size::ByteSize; use crate::upstream::UpstreamDescriptor; -use crate::{RedisConfig, RedisConnection}; +use crate::{RedisConfig, RedisConfigs, RedisPools}; const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10; @@ -1024,7 +1023,7 @@ pub struct Processing { pub kafka_validate_topics: bool, /// Redis hosts to connect to for storing state for rate limits. #[serde(default)] - pub redis: Option, + pub redis: Option, /// Maximum chunk size of attachments for Kafka. #[serde(default = "default_chunk_size")] pub attachment_chunk_size: ByteSize, @@ -1577,8 +1576,9 @@ impl Config { } } + // TODO: Should this be generalized to being able to override individual redis pool settings? if let Some(redis) = overrides.redis_url { - processing.redis = Some(RedisConfig::single(redis)) + processing.redis = Some(RedisConfigs::Single(RedisConfig::single(redis))) } if let Some(kafka_url) = overrides.kafka_url { @@ -2273,25 +2273,12 @@ impl Config { } /// Redis servers to connect to, for rate limiting. - pub fn redis(&self) -> Option<(&RedisConnection, RedisConfigOptions)> { - let cpu_concurrency = self.cpu_concurrency(); - - let redis = self.values.processing.redis.as_ref()?; - - let options = RedisConfigOptions { - max_connections: redis - .options - .max_connections - .unwrap_or(cpu_concurrency as u32 * 2) - .min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), - connection_timeout: redis.options.connection_timeout, - max_lifetime: redis.options.max_lifetime, - idle_timeout: redis.options.idle_timeout, - read_timeout: redis.options.read_timeout, - write_timeout: redis.options.write_timeout, + pub fn redis(&self) -> RedisPools { + let Some(redis_configs) = self.values.processing.redis.as_ref() else { + return RedisPools::default(); }; - Some((&redis.connection, options)) + RedisPools::from_configs(redis_configs, self.cpu_concurrency()) } /// Chunk size of attachments in bytes. diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 437d1e5b15..d351dfeaec 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -1,3 +1,4 @@ +use relay_redis::RedisConfigOptions; use serde::{Deserialize, Serialize}; /// For small setups, `2 x limits.max_thread_count` does not leave enough headroom. @@ -86,7 +87,8 @@ pub enum RedisConnection { } /// Configuration for connecting a redis client. -#[derive(Clone, Debug, Serialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(from = "RedisConfigFromFile")] pub struct RedisConfig { /// Redis connection info. #[serde(flatten)] @@ -128,12 +130,93 @@ impl From for RedisConfig { } } -impl<'de> Deserialize<'de> for RedisConfig { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - RedisConfigFromFile::deserialize(deserializer).map(Into::into) +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(untagged)] +pub enum RedisConfigs { + Single(RedisConfig), + Individual { + project_config: Option>, + cardinality: Option>, + quotas: Option>, + misc: Option>, + }, +} + +#[derive(Debug, Clone)] +pub struct RedisPool<'a> { + connection: &'a RedisConnection, + options: RedisConfigOptions, +} + +impl<'a> RedisPool<'a> { + fn from_config(config: &'a RedisConfig, cpu_concurrency: usize) -> Self { + let options = RedisConfigOptions { + max_connections: config + .options + .max_connections + .unwrap_or(cpu_concurrency as u32 * 2) + .min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), + connection_timeout: config.options.connection_timeout, + max_lifetime: config.options.max_lifetime, + idle_timeout: config.options.idle_timeout, + read_timeout: config.options.read_timeout, + write_timeout: config.options.write_timeout, + }; + + Self { + connection: &config.connection, + options, + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct RedisPools<'a> { + project_config: Option>, + cardinality: Option>, + quotas: Option>, + misc: Option>, +} + +impl<'a> RedisPools<'a> { + pub fn from_configs(configs: &'a RedisConfigs, cpu_concurrency: usize) -> Self { + match configs { + RedisConfigs::Single(cfg) => { + let pool = RedisPool::from_config(&cfg, cpu_concurrency); + Self { + project_config: Some(pool.clone()), + cardinality: Some(pool.clone()), + quotas: Some(pool.clone()), + misc: Some(pool), + } + } + RedisConfigs::Individual { + project_config, + cardinality, + quotas, + misc, + } => { + let project_config = project_config + .as_ref() + .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); + let cardinality = cardinality + .as_ref() + .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); + let quotas = quotas + .as_ref() + .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); + let misc = misc + .as_ref() + .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); + + Self { + project_config, + cardinality, + quotas, + misc, + } + } + } } } From 6aa5495d41dbd1b43b107856515e58cbace52d1f Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 19 Jul 2024 16:24:43 +0200 Subject: [PATCH 02/21] WIP (it works?) --- relay-config/src/redis.rs | 12 +++--- relay-redis/src/real.rs | 8 ++++ relay-server/src/service.rs | 73 ++++++++++++++++++++++++++++------- relay-server/src/testutils.rs | 35 ++++------------- 4 files changed, 79 insertions(+), 49 deletions(-) diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index d351dfeaec..2db44756cf 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -144,8 +144,8 @@ pub enum RedisConfigs { #[derive(Debug, Clone)] pub struct RedisPool<'a> { - connection: &'a RedisConnection, - options: RedisConfigOptions, + pub connection: &'a RedisConnection, + pub options: RedisConfigOptions, } impl<'a> RedisPool<'a> { @@ -172,10 +172,10 @@ impl<'a> RedisPool<'a> { #[derive(Debug, Clone, Default)] pub struct RedisPools<'a> { - project_config: Option>, - cardinality: Option>, - quotas: Option>, - misc: Option>, + pub project_config: Option>, + pub cardinality: Option>, + pub quotas: Option>, + pub misc: Option>, } impl<'a> RedisPools<'a> { diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index 6b4f129940..4ddaac1a21 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -208,6 +208,14 @@ impl RedisPool { } } +#[derive(Debug, Clone, Default)] +pub struct RedisPools { + pub project_config: Option, + pub cardinality: Option, + pub quotas: Option, + pub misc: Option, +} + /// Stats about how the [`RedisPool`] is performing. pub struct Stats { /// The number of connections currently being managed by the pool. diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 651dedf15d..56baf32374 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -11,7 +11,7 @@ use axum::http::request::Parts; use rayon::ThreadPool; use relay_cogs::Cogs; use relay_config::{Config, RedisConnection}; -use relay_redis::RedisPool; +use relay_redis::{RedisError, RedisPool, RedisPools}; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; @@ -134,17 +134,7 @@ impl ServiceState { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); - let redis_pool = config - .redis() - .filter(|_| config.processing_enabled()) - .map(|redis| match redis { - (RedisConnection::Single(server), options) => RedisPool::single(server, options), - (RedisConnection::Cluster(servers), options) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) - } - }) - .transpose() - .context(ServiceError::Redis)?; + let redis_pools = create_redis_pools(&config)?; let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size())); @@ -210,7 +200,8 @@ impl ServiceState { global_config_handle, cogs, #[cfg(feature = "processing")] - redis_pool.clone(), + // TODO: Figure out right pool + redis_pools.project_config.clone(), processor::Addrs { project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), @@ -238,7 +229,8 @@ impl ServiceState { buffer_guard.clone(), project_cache_services, metric_outcomes, - redis_pool.clone(), + // TODO: Figure out right pool + redis_pools.project_config.clone(), ) .spawn_handler(project_cache_rx); @@ -254,7 +246,8 @@ impl ServiceState { config.clone(), upstream_relay.clone(), #[cfg(feature = "processing")] - redis_pool, + // TODO: Figure out right pool + redis_pools.project_config.clone(), ) .start(); @@ -343,6 +336,56 @@ impl ServiceState { } } +fn create_redis_pool( + config: relay_config::RedisPool, +) -> Result { + match config.connection { + RedisConnection::Single(server) => RedisPool::single(server, config.options), + RedisConnection::Cluster(servers) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), config.options) + } + } +} + +pub fn create_redis_pools(config: &Config) -> Result { + Ok(if !config.processing_enabled() { + RedisPools::default() + } else { + let pools = config.redis(); + + let project_config = pools + .project_config + .map(create_redis_pool) + .transpose() + .context(ServiceError::Redis)?; + + let cardinality = pools + .cardinality + .map(create_redis_pool) + .transpose() + .context(ServiceError::Redis)?; + + let quotas = pools + .quotas + .map(create_redis_pool) + .transpose() + .context(ServiceError::Redis)?; + + let misc = pools + .misc + .map(create_redis_pool) + .transpose() + .context(ServiceError::Redis)?; + + RedisPools { + project_config, + cardinality, + quotas, + misc, + } + }) +} + #[axum::async_trait] impl FromRequestParts for ServiceState { type Rejection = Infallible; diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index d14e5e16ac..50dbc44019 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -12,12 +12,10 @@ use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use relay_system::Addr; use relay_test::mock_service; -#[cfg(feature = "processing")] -use {relay_config::RedisConnection, relay_redis::RedisPool}; - use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; +use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; @@ -123,17 +121,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] - let redis = config - .redis() - .filter(|_| config.processing_enabled()) - .map(|redis| match redis { - (RedisConnection::Single(server), options) => { - RedisPool::single(server, options).unwrap() - } - (RedisConnection::Cluster(servers), options) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options).unwrap() - } - }); + let redis_pools = create_redis_pools(&config).unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); @@ -144,7 +132,8 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - redis, + // TODO: Figure out right pool + redis_pools.project_config, processor::Addrs { outcome_aggregator, project_cache, @@ -162,18 +151,7 @@ pub fn create_test_processor_with_addrs( addrs: processor::Addrs, ) -> EnvelopeProcessorService { #[cfg(feature = "processing")] - let redis = config - .redis() - .filter(|_| config.processing_enabled()) - .map(|redis| match redis { - (RedisConnection::Single(server), options) => { - RedisPool::single(server, options).unwrap() - } - (RedisConnection::Cluster(servers), options) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options).unwrap() - } - }); - + let redis_pools = create_redis_pools(&config).unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, addrs.outcome_aggregator.clone()); @@ -184,7 +162,8 @@ pub fn create_test_processor_with_addrs( GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - redis, + // TODO: Figure out right pool + redis_pools.project_config, addrs, metric_outcomes, ) From a659a94ebc066475c3124a1efc6d6cc315375291 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 19 Jul 2024 16:47:36 +0200 Subject: [PATCH 03/21] Single -> Unified --- relay-config/src/config.rs | 2 +- relay-config/src/redis.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 7dbb18a93d..cd41cbabc3 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -1578,7 +1578,7 @@ impl Config { // TODO: Should this be generalized to being able to override individual redis pool settings? if let Some(redis) = overrides.redis_url { - processing.redis = Some(RedisConfigs::Single(RedisConfig::single(redis))) + processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis))) } if let Some(kafka_url) = overrides.kafka_url { diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 2db44756cf..3a953bc0aa 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -133,7 +133,7 @@ impl From for RedisConfig { #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] #[serde(untagged)] pub enum RedisConfigs { - Single(RedisConfig), + Unified(RedisConfig), Individual { project_config: Option>, cardinality: Option>, @@ -181,7 +181,7 @@ pub struct RedisPools<'a> { impl<'a> RedisPools<'a> { pub fn from_configs(configs: &'a RedisConfigs, cpu_concurrency: usize) -> Self { match configs { - RedisConfigs::Single(cfg) => { + RedisConfigs::Unified(cfg) => { let pool = RedisPool::from_config(&cfg, cpu_concurrency); Self { project_config: Some(pool.clone()), From 01f047a4d7b26775a4df25c55000b427d4d3328f Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 19 Jul 2024 16:47:43 +0200 Subject: [PATCH 04/21] Tests --- relay-config/src/redis.rs | 203 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 3a953bc0aa..fc115b90e5 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -135,9 +135,13 @@ impl From for RedisConfig { pub enum RedisConfigs { Unified(RedisConfig), Individual { + #[serde(skip_serializing_if = "Option::is_none")] project_config: Option>, + #[serde(skip_serializing_if = "Option::is_none")] cardinality: Option>, + #[serde(skip_serializing_if = "Option::is_none")] quotas: Option>, + #[serde(skip_serializing_if = "Option::is_none")] misc: Option>, }, } @@ -250,6 +254,75 @@ connection_timeout: 5 ); } + #[test] + fn test_redis_single_opts_unified() { + let yaml = r#" +server: "redis://127.0.0.1:6379" +max_connections: 42 +connection_timeout: 5 +"#; + + let config: RedisConfigs = serde_yaml::from_str(yaml) + .expect("Parsed processing redis config: single with options"); + + assert_eq!( + config, + RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + max_connections: Some(42), + connection_timeout: 5, + ..Default::default() + } + }) + ); + } + + #[test] + fn test_redis_individual() { + let yaml = r#" +project_config: + server: "redis://127.0.0.1:6379" + max_connections: 42 + connection_timeout: 5 +quotas: + cluster_nodes: + - "redis://127.0.0.1:6379" + - "redis://127.0.0.2:6379" + max_connections: 17 + connection_timeout: 5 +"#; + + let configs: RedisConfigs = serde_yaml::from_str(yaml) + .expect("Parsed processing redis configs: single with options"); + + let expected = RedisConfigs::Individual { + project_config: Some(Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + max_connections: Some(42), + connection_timeout: 5, + ..Default::default() + }, + })), + cardinality: None, + quotas: Some(Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: PartialRedisConfigOptions { + max_connections: Some(17), + connection_timeout: 5, + ..Default::default() + }, + })), + misc: None, + }; + + assert_eq!(configs, expected,); + } + #[test] fn test_redis_single_serialize() { let config = RedisConfig { @@ -272,6 +345,28 @@ connection_timeout: 5 "###); } + #[test] + fn test_redis_single_serialize_unified() { + let configs = RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + connection_timeout: 5, + ..Default::default() + }, + }); + + assert_json_snapshot!(configs, @r###" + { + "server": "redis://127.0.0.1:6379", + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + } + "###); + } + #[test] fn test_redis_single_opts_default() { let yaml = r#" @@ -337,6 +432,33 @@ read_timeout: 10 ); } + #[test] + fn test_redis_cluster_nodes_opts_unified() { + let yaml = r#" +cluster_nodes: + - "redis://127.0.0.1:6379" + - "redis://127.0.0.2:6379" +read_timeout: 10 +"#; + + let config: RedisConfigs = serde_yaml::from_str(yaml) + .expect("Parsed processing redis config: single with options"); + + assert_eq!( + config, + RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned() + ]), + options: PartialRedisConfigOptions { + read_timeout: 10, + ..Default::default() + }, + }) + ); + } + #[test] fn test_redis_cluster_serialize() { let config = RedisConfig { @@ -364,4 +486,85 @@ read_timeout: 10 } "###); } + + #[test] + fn test_redis_cluster_serialize_unified() { + let configs = RedisConfigs::Unified(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: PartialRedisConfigOptions { + read_timeout: 33, + ..Default::default() + }, + }); + + assert_json_snapshot!(configs, @r###" + { + "cluster_nodes": [ + "redis://127.0.0.1:6379", + "redis://127.0.0.2:6379" + ], + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 33, + "write_timeout": 3 + } + "###); + } + + #[test] + fn test_redis_serialize_individual() { + let configs = RedisConfigs::Individual { + project_config: Some(Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: PartialRedisConfigOptions { + max_connections: Some(42), + connection_timeout: 5, + ..Default::default() + }, + })), + cardinality: None, + quotas: Some(Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: PartialRedisConfigOptions { + max_connections: Some(17), + connection_timeout: 5, + ..Default::default() + }, + })), + misc: None, + }; + + assert_json_snapshot!(configs, @r###" + { + "project_config": { + "server": "redis://127.0.0.1:6379", + "max_connections": 42, + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + }, + "quotas": { + "cluster_nodes": [ + "redis://127.0.0.1:6379", + "redis://127.0.0.2:6379" + ], + "max_connections": 17, + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + } + } + "###); + } } From 573104453d344c55c546256810459504f952e5a9 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 11:25:42 +0200 Subject: [PATCH 05/21] Pass pools through --- relay-server/src/service.rs | 7 +--- relay-server/src/services/processor.rs | 10 +++-- relay-server/src/services/stats.rs | 51 +++++++++++++++++++------- relay-server/src/testutils.rs | 6 +-- 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 56baf32374..9687b3e333 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -200,8 +200,7 @@ impl ServiceState { global_config_handle, cogs, #[cfg(feature = "processing")] - // TODO: Figure out right pool - redis_pools.project_config.clone(), + redis_pools.clone(), processor::Addrs { project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), @@ -229,7 +228,6 @@ impl ServiceState { buffer_guard.clone(), project_cache_services, metric_outcomes, - // TODO: Figure out right pool redis_pools.project_config.clone(), ) .spawn_handler(project_cache_rx); @@ -246,8 +244,7 @@ impl ServiceState { config.clone(), upstream_relay.clone(), #[cfg(feature = "processing")] - // TODO: Figure out right pool - redis_pools.project_config.clone(), + redis_pools.clone(), ) .start(); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 5a834fe970..6a0a36baa0 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -37,6 +37,7 @@ use relay_pii::PiiConfigError; use relay_profiling::ProfileId; use relay_protocol::{Annotated, Value}; use relay_quotas::{DataCategory, Scoping}; +use relay_redis::RedisPools; use relay_sampling::config::RuleId; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; use relay_sampling::DynamicSamplingContext; @@ -1137,7 +1138,7 @@ impl EnvelopeProcessorService { config: Arc, global_config: GlobalConfigHandle, cogs: Cogs, - #[cfg(feature = "processing")] redis: Option, + #[cfg(feature = "processing")] redis: RedisPools, addrs: Addrs, metric_outcomes: MetricOutcomes, ) -> Self { @@ -1156,19 +1157,22 @@ impl EnvelopeProcessorService { global_config, cogs, #[cfg(feature = "processing")] - redis_pool: redis.clone(), + // TODO: Tentatively using `misc` for this + redis_pool: redis.misc.clone(), #[cfg(feature = "processing")] rate_limiter: redis + .quotas .clone() .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), addrs, geoip_lookup, #[cfg(feature = "processing")] - metric_meta_store: redis.clone().map(|pool| { + metric_meta_store: redis.misc.clone().map(|pool| { RedisMetricMetaStore::new(pool, config.metrics_meta_locations_expiry()) }), #[cfg(feature = "processing")] cardinality_limiter: redis + .cardinality .clone() .map(|pool| { RedisSetLimiter::new( diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a2a1a960d7..06bc99d581 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; #[cfg(feature = "processing")] -use relay_redis::RedisPool; +use relay_redis::RedisPools; use relay_statsd::metric; use relay_system::{Addr, Service}; use tokio::time::interval; @@ -17,20 +17,20 @@ pub struct RelayStats { config: Arc, upstream_relay: Addr, #[cfg(feature = "processing")] - redis_pool: Option, + redis_pools: RedisPools, } impl RelayStats { pub fn new( config: Arc, upstream_relay: Addr, - #[cfg(feature = "processing")] redis_pool: Option, + #[cfg(feature = "processing")] redis_pools: RedisPools, ) -> Self { Self { config, upstream_relay, #[cfg(feature = "processing")] - redis_pool, + redis_pools, } } @@ -101,17 +101,42 @@ impl RelayStats { } #[cfg(not(feature = "processing"))] - async fn redis_pool(&self) {} + async fn redis_pools(&self) {} #[cfg(feature = "processing")] - async fn redis_pool(&self) { - let Some(ref redis_pool) = self.redis_pool else { - return; - }; + async fn redis_pools(&self) { + // TODO: How to report values for the different pools? Tag them? + if let Some(ref project_config) = self.redis_pools.project_config { + let state = project_config.stats(); + metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); + metric!( + gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) + ); + } + + if let Some(ref cardinality) = self.redis_pools.cardinality { + let state = cardinality.stats(); + metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); + metric!( + gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) + ); + } + + if let Some(ref quotas) = self.redis_pools.quotas { + let state = quotas.stats(); + metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); + metric!( + gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) + ); + } - let state = redis_pool.stats(); - metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); - metric!(gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections)); + if let Some(ref misc) = self.redis_pools.misc { + let state = misc.stats(); + metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); + metric!( + gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) + ); + } } } @@ -128,7 +153,7 @@ impl Service for RelayStats { let _ = tokio::join!( self.upstream_status(), self.tokio_metrics(), - self.redis_pool(), + self.redis_pools(), ); ticker.tick().await; } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 50dbc44019..9bf1e349ff 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -132,8 +132,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - // TODO: Figure out right pool - redis_pools.project_config, + redis_pools, processor::Addrs { outcome_aggregator, project_cache, @@ -162,8 +161,7 @@ pub fn create_test_processor_with_addrs( GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - // TODO: Figure out right pool - redis_pools.project_config, + redis_pools, addrs, metric_outcomes, ) From 6bf23f286655d4068b70231aaaecd9f08a33f97d Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 12:33:51 +0200 Subject: [PATCH 06/21] project_config -> project_configs --- relay-config/src/redis.rs | 20 +++++++------- relay-redis/src/real.rs | 2 +- relay-server/src/service.rs | 8 +++--- relay-server/src/services/stats.rs | 42 ++++++++++++++---------------- 4 files changed, 34 insertions(+), 38 deletions(-) diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index fc115b90e5..96da61a7cd 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -136,7 +136,7 @@ pub enum RedisConfigs { Unified(RedisConfig), Individual { #[serde(skip_serializing_if = "Option::is_none")] - project_config: Option>, + project_configs: Option>, #[serde(skip_serializing_if = "Option::is_none")] cardinality: Option>, #[serde(skip_serializing_if = "Option::is_none")] @@ -176,7 +176,7 @@ impl<'a> RedisPool<'a> { #[derive(Debug, Clone, Default)] pub struct RedisPools<'a> { - pub project_config: Option>, + pub project_configs: Option>, pub cardinality: Option>, pub quotas: Option>, pub misc: Option>, @@ -188,19 +188,19 @@ impl<'a> RedisPools<'a> { RedisConfigs::Unified(cfg) => { let pool = RedisPool::from_config(&cfg, cpu_concurrency); Self { - project_config: Some(pool.clone()), + project_configs: Some(pool.clone()), cardinality: Some(pool.clone()), quotas: Some(pool.clone()), misc: Some(pool), } } RedisConfigs::Individual { - project_config, + project_configs, cardinality, quotas, misc, } => { - let project_config = project_config + let project_configs = project_configs .as_ref() .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); let cardinality = cardinality @@ -214,7 +214,7 @@ impl<'a> RedisPools<'a> { .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); Self { - project_config, + project_configs, cardinality, quotas, misc, @@ -281,7 +281,7 @@ connection_timeout: 5 #[test] fn test_redis_individual() { let yaml = r#" -project_config: +project_configs: server: "redis://127.0.0.1:6379" max_connections: 42 connection_timeout: 5 @@ -297,7 +297,7 @@ quotas: .expect("Parsed processing redis configs: single with options"); let expected = RedisConfigs::Individual { - project_config: Some(Box::new(RedisConfig { + project_configs: Some(Box::new(RedisConfig { connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), options: PartialRedisConfigOptions { max_connections: Some(42), @@ -518,7 +518,7 @@ read_timeout: 10 #[test] fn test_redis_serialize_individual() { let configs = RedisConfigs::Individual { - project_config: Some(Box::new(RedisConfig { + project_configs: Some(Box::new(RedisConfig { connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), options: PartialRedisConfigOptions { max_connections: Some(42), @@ -543,7 +543,7 @@ read_timeout: 10 assert_json_snapshot!(configs, @r###" { - "project_config": { + "project_configs": { "server": "redis://127.0.0.1:6379", "max_connections": 42, "connection_timeout": 5, diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index 4ddaac1a21..6c255c63d1 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -210,7 +210,7 @@ impl RedisPool { #[derive(Debug, Clone, Default)] pub struct RedisPools { - pub project_config: Option, + pub project_configs: Option, pub cardinality: Option, pub quotas: Option, pub misc: Option, diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 9687b3e333..120838d3a4 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -228,7 +228,7 @@ impl ServiceState { buffer_guard.clone(), project_cache_services, metric_outcomes, - redis_pools.project_config.clone(), + redis_pools.project_configs.clone(), ) .spawn_handler(project_cache_rx); @@ -350,8 +350,8 @@ pub fn create_redis_pools(config: &Config) -> Result } else { let pools = config.redis(); - let project_config = pools - .project_config + let project_configs = pools + .project_configs .map(create_redis_pool) .transpose() .context(ServiceError::Redis)?; @@ -375,7 +375,7 @@ pub fn create_redis_pools(config: &Config) -> Result .context(ServiceError::Redis)?; RedisPools { - project_config, + project_configs, cardinality, quotas, misc, diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index 06bc99d581..0cd190b23c 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; #[cfg(feature = "processing")] -use relay_redis::RedisPools; +use relay_redis::{RedisPool, RedisPools}; use relay_statsd::metric; use relay_system::{Addr, Service}; use tokio::time::interval; @@ -103,39 +103,35 @@ impl RelayStats { #[cfg(not(feature = "processing"))] async fn redis_pools(&self) {} + #[cfg(feature = "processing")] + async fn redis_pool(redis_pool: &RedisPool, name: &str) { + let state = redis_pool.stats(); + metric!( + gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections), + pool = name + ); + metric!( + gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections), + pool = name + ); + } + #[cfg(feature = "processing")] async fn redis_pools(&self) { - // TODO: How to report values for the different pools? Tag them? - if let Some(ref project_config) = self.redis_pools.project_config { - let state = project_config.stats(); - metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); - metric!( - gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) - ); + if let Some(ref project_configs) = self.redis_pools.project_configs { + Self::redis_pool(project_configs, "project_configs").await; } if let Some(ref cardinality) = self.redis_pools.cardinality { - let state = cardinality.stats(); - metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); - metric!( - gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) - ); + Self::redis_pool(cardinality, "cardinality").await; } if let Some(ref quotas) = self.redis_pools.quotas { - let state = quotas.stats(); - metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); - metric!( - gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) - ); + Self::redis_pool(quotas, "quotas").await; } if let Some(ref misc) = self.redis_pools.misc { - let state = misc.stats(); - metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); - metric!( - gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections) - ); + Self::redis_pool(misc, "misc").await; } } } From 6cbbf9bed297d440ae62e43d4f5df9992f6d38db Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 12:34:32 +0200 Subject: [PATCH 07/21] Use quotas for reservoir --- relay-server/src/services/processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 6a0a36baa0..c1e231ac12 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1157,8 +1157,8 @@ impl EnvelopeProcessorService { global_config, cogs, #[cfg(feature = "processing")] - // TODO: Tentatively using `misc` for this - redis_pool: redis.misc.clone(), + // TODO: Tentatively using `quotas` for this + redis_pool: redis.quotas.clone(), #[cfg(feature = "processing")] rate_limiter: redis .quotas From 3b3f0c407fb094dfa7803235405cca72fef3d6b5 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 12:36:53 +0200 Subject: [PATCH 08/21] Remove TODO --- relay-config/src/config.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index cd41cbabc3..9549d031a7 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -1576,7 +1576,6 @@ impl Config { } } - // TODO: Should this be generalized to being able to override individual redis pool settings? if let Some(redis) = overrides.redis_url { processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis))) } From 696490b646d0df868b9b0cdec27f6e75521f0d3e Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 13:18:14 +0200 Subject: [PATCH 09/21] Refactor --- relay-config/src/config.rs | 14 ++-- relay-config/src/redis.rs | 128 ++++++++++++++--------------- relay-server/src/service.rs | 55 +------------ relay-server/src/services/stats.rs | 6 +- relay-server/src/testutils.rs | 5 +- 5 files changed, 77 insertions(+), 131 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 9549d031a7..70d7bc1c3e 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -16,6 +16,7 @@ use relay_kafka::{ }; use relay_metrics::aggregator::{AggregatorConfig, FlushBatching}; use relay_metrics::MetricNamespace; +use relay_redis::{RedisError, RedisPools}; use serde::de::{DeserializeOwned, Unexpected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; @@ -23,7 +24,7 @@ use uuid::Uuid; use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig}; use crate::byte_size::ByteSize; use crate::upstream::UpstreamDescriptor; -use crate::{RedisConfig, RedisConfigs, RedisPools}; +use crate::{create_redis_pools, RedisConfig, RedisConfigs}; const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10; @@ -2272,12 +2273,15 @@ impl Config { } /// Redis servers to connect to, for rate limiting. - pub fn redis(&self) -> RedisPools { + pub fn redis(&self) -> Result { + if !self.processing_enabled() { + return Ok(RedisPools::default()); + } + let Some(redis_configs) = self.values.processing.redis.as_ref() else { - return RedisPools::default(); + return Ok(RedisPools::default()); }; - - RedisPools::from_configs(redis_configs, self.cpu_concurrency()) + create_redis_pools(redis_configs, self.cpu_concurrency()) } /// Chunk size of attachments in bytes. diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 96da61a7cd..5bf02294c8 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -1,4 +1,4 @@ -use relay_redis::RedisConfigOptions; +use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; use serde::{Deserialize, Serialize}; /// For small setups, `2 x limits.max_thread_count` does not leave enough headroom. @@ -146,80 +146,74 @@ pub enum RedisConfigs { }, } -#[derive(Debug, Clone)] -pub struct RedisPool<'a> { - pub connection: &'a RedisConnection, - pub options: RedisConfigOptions, -} - -impl<'a> RedisPool<'a> { - fn from_config(config: &'a RedisConfig, cpu_concurrency: usize) -> Self { - let options = RedisConfigOptions { - max_connections: config - .options - .max_connections - .unwrap_or(cpu_concurrency as u32 * 2) - .min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), - connection_timeout: config.options.connection_timeout, - max_lifetime: config.options.max_lifetime, - idle_timeout: config.options.idle_timeout, - read_timeout: config.options.read_timeout, - write_timeout: config.options.write_timeout, - }; - - Self { - connection: &config.connection, - options, +fn create_redis_pool( + config: &RedisConfig, + cpu_concurrency: usize, +) -> Result { + let options = RedisConfigOptions { + max_connections: config + .options + .max_connections + .unwrap_or(cpu_concurrency as u32 * 2) + .min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), + connection_timeout: config.options.connection_timeout, + max_lifetime: config.options.max_lifetime, + idle_timeout: config.options.idle_timeout, + read_timeout: config.options.read_timeout, + write_timeout: config.options.write_timeout, + }; + + match &config.connection { + RedisConnection::Single(server) => RedisPool::single(server, options), + RedisConnection::Cluster(servers) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) } } } -#[derive(Debug, Clone, Default)] -pub struct RedisPools<'a> { - pub project_configs: Option>, - pub cardinality: Option>, - pub quotas: Option>, - pub misc: Option>, -} - -impl<'a> RedisPools<'a> { - pub fn from_configs(configs: &'a RedisConfigs, cpu_concurrency: usize) -> Self { - match configs { - RedisConfigs::Unified(cfg) => { - let pool = RedisPool::from_config(&cfg, cpu_concurrency); - Self { - project_configs: Some(pool.clone()), - cardinality: Some(pool.clone()), - quotas: Some(pool.clone()), - misc: Some(pool), - } - } - RedisConfigs::Individual { +pub(super) fn create_redis_pools( + configs: &RedisConfigs, + cpu_concurrency: usize, +) -> Result { + match configs { + RedisConfigs::Unified(cfg) => { + let pool = create_redis_pool(cfg, cpu_concurrency)?; + Ok(RedisPools { + project_configs: Some(pool.clone()), + cardinality: Some(pool.clone()), + quotas: Some(pool.clone()), + misc: Some(pool), + }) + } + RedisConfigs::Individual { + project_configs, + cardinality, + quotas, + misc, + } => { + let project_configs = project_configs + .as_ref() + .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) + .transpose()?; + let cardinality = cardinality + .as_ref() + .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) + .transpose()?; + let quotas = quotas + .as_ref() + .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) + .transpose()?; + let misc = misc + .as_ref() + .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) + .transpose()?; + + Ok(RedisPools { project_configs, cardinality, quotas, misc, - } => { - let project_configs = project_configs - .as_ref() - .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); - let cardinality = cardinality - .as_ref() - .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); - let quotas = quotas - .as_ref() - .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); - let misc = misc - .as_ref() - .map(|cfg| RedisPool::from_config(cfg, cpu_concurrency)); - - Self { - project_configs, - cardinality, - quotas, - misc, - } - } + }) } } } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 120838d3a4..98e1791740 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -10,8 +10,7 @@ use axum::extract::FromRequestParts; use axum::http::request::Parts; use rayon::ThreadPool; use relay_cogs::Cogs; -use relay_config::{Config, RedisConnection}; -use relay_redis::{RedisError, RedisPool, RedisPools}; +use relay_config::Config; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; @@ -134,7 +133,7 @@ impl ServiceState { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); - let redis_pools = create_redis_pools(&config)?; + let redis_pools = config.redis().context(ServiceError::Redis)?; let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size())); @@ -333,56 +332,6 @@ impl ServiceState { } } -fn create_redis_pool( - config: relay_config::RedisPool, -) -> Result { - match config.connection { - RedisConnection::Single(server) => RedisPool::single(server, config.options), - RedisConnection::Cluster(servers) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), config.options) - } - } -} - -pub fn create_redis_pools(config: &Config) -> Result { - Ok(if !config.processing_enabled() { - RedisPools::default() - } else { - let pools = config.redis(); - - let project_configs = pools - .project_configs - .map(create_redis_pool) - .transpose() - .context(ServiceError::Redis)?; - - let cardinality = pools - .cardinality - .map(create_redis_pool) - .transpose() - .context(ServiceError::Redis)?; - - let quotas = pools - .quotas - .map(create_redis_pool) - .transpose() - .context(ServiceError::Redis)?; - - let misc = pools - .misc - .map(create_redis_pool) - .transpose() - .context(ServiceError::Redis)?; - - RedisPools { - project_configs, - cardinality, - quotas, - misc, - } - }) -} - #[axum::async_trait] impl FromRequestParts for ServiceState { type Rejection = Infallible; diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index 0cd190b23c..cb2460a7b7 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -100,9 +100,6 @@ impl RelayStats { } } - #[cfg(not(feature = "processing"))] - async fn redis_pools(&self) {} - #[cfg(feature = "processing")] async fn redis_pool(redis_pool: &RedisPool, name: &str) { let state = redis_pool.stats(); @@ -116,6 +113,9 @@ impl RelayStats { ); } + #[cfg(not(feature = "processing"))] + async fn redis_pools(&self) {} + #[cfg(feature = "processing")] async fn redis_pools(&self) { if let Some(ref project_configs) = self.redis_pools.project_configs { diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 9bf1e349ff..1d034f0723 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -15,7 +15,6 @@ use relay_test::mock_service; use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; -use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; @@ -121,7 +120,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] - let redis_pools = create_redis_pools(&config).unwrap(); + let redis_pools = config.redis().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); @@ -150,7 +149,7 @@ pub fn create_test_processor_with_addrs( addrs: processor::Addrs, ) -> EnvelopeProcessorService { #[cfg(feature = "processing")] - let redis_pools = create_redis_pools(&config).unwrap(); + let redis_pools = config.redis().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, addrs.outcome_aggregator.clone()); From 30ca38e525e37a19516d6dd15f0265f665a24cd3 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 13:31:33 +0200 Subject: [PATCH 10/21] Docs --- relay-config/src/redis.rs | 7 +++++++ relay-redis/src/real.rs | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 5bf02294c8..3c72241de5 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -130,17 +130,24 @@ impl From for RedisConfig { } } +/// Configurations for the various Redis pools used by Relay. #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] #[serde(untagged)] pub enum RedisConfigs { + /// All pools should be configured the same way. Unified(RedisConfig), + /// Individual configurations for each pool. Individual { + /// Configuration for the `project_configs` pool. #[serde(skip_serializing_if = "Option::is_none")] project_configs: Option>, + /// Configuration for the `cardinality` pool. #[serde(skip_serializing_if = "Option::is_none")] cardinality: Option>, + /// Configuration for the `quotas` pool. #[serde(skip_serializing_if = "Option::is_none")] quotas: Option>, + /// Configuration for the `misc` pool. #[serde(skip_serializing_if = "Option::is_none")] misc: Option>, }, diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index 6c255c63d1..cd0ebab053 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -208,11 +208,16 @@ impl RedisPool { } } +/// The various [`RedisPool`]s used within Relay. #[derive(Debug, Clone, Default)] pub struct RedisPools { + /// The pool used for project configurations pub project_configs: Option, + /// The pool used for cardinality limits. pub cardinality: Option, + /// The pool used for rate limiting/quotas. pub quotas: Option, + /// The pool used for metrics metadata. pub misc: Option, } From 713e1bfbb348bcc90b6015284837a0dd399df4b4 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 13:42:45 +0200 Subject: [PATCH 11/21] Noop version of RedisPools --- relay-config/src/redis.rs | 2 +- relay-redis/src/noop.rs | 13 +++++++++++++ relay-server/src/services/processor.rs | 3 +-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 3c72241de5..02b546f819 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -181,7 +181,7 @@ fn create_redis_pool( pub(super) fn create_redis_pools( configs: &RedisConfigs, cpu_concurrency: usize, -) -> Result { +) -> Result { match configs { RedisConfigs::Unified(cfg) => { let pool = create_redis_pool(cfg, cpu_concurrency)?; diff --git a/relay-redis/src/noop.rs b/relay-redis/src/noop.rs index 0bab77bdcf..b3cf26011a 100644 --- a/relay-redis/src/noop.rs +++ b/relay-redis/src/noop.rs @@ -29,3 +29,16 @@ impl RedisPool { Ok(Self) } } + +/// The various [`RedisPool`]s used within Relay. +#[derive(Debug, Clone, Default)] +pub struct RedisPools { + /// The pool used for project configurations + pub project_configs: Option, + /// The pool used for cardinality limits. + pub cardinality: Option, + /// The pool used for rate limiting/quotas. + pub quotas: Option, + /// The pool used for metrics metadata. + pub misc: Option, +} diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index c1e231ac12..a82ab0699e 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -37,7 +37,6 @@ use relay_pii::PiiConfigError; use relay_profiling::ProfileId; use relay_protocol::{Annotated, Value}; use relay_quotas::{DataCategory, Scoping}; -use relay_redis::RedisPools; use relay_sampling::config::RuleId; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; use relay_sampling::DynamicSamplingContext; @@ -60,7 +59,7 @@ use { relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups}, relay_metrics::RedisMetricMetaStore, relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter}, - relay_redis::RedisPool, + relay_redis::{RedisPool, RedisPools}, std::iter::Chain, std::slice::Iter, symbolic_unreal::{Unreal4Error, Unreal4ErrorKind}, From ac34785a9e5c5defa9244b422fcc4a075731d3c9 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Mon, 22 Jul 2024 17:57:55 +0200 Subject: [PATCH 12/21] Make individual pools non-optional --- relay-config/src/config.rs | 17 +-- relay-config/src/redis.rs | 151 +++++++++++++++---------- relay-redis/src/noop.rs | 10 +- relay-redis/src/real.rs | 10 +- relay-server/src/service.rs | 40 ++++++- relay-server/src/services/processor.rs | 32 +++--- relay-server/src/services/stats.rs | 21 ++-- relay-server/src/testutils.rs | 5 +- 8 files changed, 177 insertions(+), 109 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 70d7bc1c3e..3b1b514400 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -16,7 +16,6 @@ use relay_kafka::{ }; use relay_metrics::aggregator::{AggregatorConfig, FlushBatching}; use relay_metrics::MetricNamespace; -use relay_redis::{RedisError, RedisPools}; use serde::de::{DeserializeOwned, Unexpected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; @@ -24,7 +23,7 @@ use uuid::Uuid; use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig}; use crate::byte_size::ByteSize; use crate::upstream::UpstreamDescriptor; -use crate::{create_redis_pools, RedisConfig, RedisConfigs}; +use crate::{create_redis_pools, RedisConfig, RedisConfigs, RedisPoolConfigs}; const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10; @@ -2272,16 +2271,12 @@ impl Config { &self.values.processing.topics.unused } - /// Redis servers to connect to, for rate limiting. - pub fn redis(&self) -> Result { - if !self.processing_enabled() { - return Ok(RedisPools::default()); - } + /// Redis servers to connect to for project configs, cardinality limits, + /// rate limiting, and metrics metadata. + pub fn redis(&self) -> Option { + let redis_configs = self.values.processing.redis.as_ref()?; - let Some(redis_configs) = self.values.processing.redis.as_ref() else { - return Ok(RedisPools::default()); - }; - create_redis_pools(redis_configs, self.cpu_concurrency()) + Some(create_redis_pools(redis_configs, self.cpu_concurrency())) } /// Chunk size of attachments in bytes. diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 02b546f819..98e46317a6 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -1,4 +1,4 @@ -use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; +use relay_redis::RedisConfigOptions; use serde::{Deserialize, Serialize}; /// For small setups, `2 x limits.max_thread_count` does not leave enough headroom. @@ -139,24 +139,33 @@ pub enum RedisConfigs { /// Individual configurations for each pool. Individual { /// Configuration for the `project_configs` pool. - #[serde(skip_serializing_if = "Option::is_none")] - project_configs: Option>, + project_configs: Box, /// Configuration for the `cardinality` pool. - #[serde(skip_serializing_if = "Option::is_none")] - cardinality: Option>, + cardinality: Box, /// Configuration for the `quotas` pool. - #[serde(skip_serializing_if = "Option::is_none")] - quotas: Option>, + quotas: Box, /// Configuration for the `misc` pool. - #[serde(skip_serializing_if = "Option::is_none")] - misc: Option>, + misc: Box, }, } -fn create_redis_pool( +/// Helper struct bundling connections and options for the various Redis pools. +#[derive(Clone, Debug)] +pub struct RedisPoolConfigs<'a> { + /// Configuration for the `project_configs` pool. + pub project_configs: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `cardinality` pool. + pub cardinality: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `quotas` pool. + pub quotas: (&'a RedisConnection, RedisConfigOptions), + /// Configuration for the `misc` pool. + pub misc: (&'a RedisConnection, RedisConfigOptions), +} + +pub(super) fn create_redis_pool( config: &RedisConfig, cpu_concurrency: usize, -) -> Result { +) -> (&RedisConnection, RedisConfigOptions) { let options = RedisConfigOptions { max_connections: config .options @@ -170,27 +179,22 @@ fn create_redis_pool( write_timeout: config.options.write_timeout, }; - match &config.connection { - RedisConnection::Single(server) => RedisPool::single(server, options), - RedisConnection::Cluster(servers) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) - } - } + (&config.connection, options) } pub(super) fn create_redis_pools( configs: &RedisConfigs, cpu_concurrency: usize, -) -> Result { +) -> RedisPoolConfigs { match configs { RedisConfigs::Unified(cfg) => { - let pool = create_redis_pool(cfg, cpu_concurrency)?; - Ok(RedisPools { - project_configs: Some(pool.clone()), - cardinality: Some(pool.clone()), - quotas: Some(pool.clone()), - misc: Some(pool), - }) + let pool = create_redis_pool(cfg, cpu_concurrency); + RedisPoolConfigs { + project_configs: pool.clone(), + cardinality: pool.clone(), + quotas: pool.clone(), + misc: pool, + } } RedisConfigs::Individual { project_configs, @@ -198,29 +202,17 @@ pub(super) fn create_redis_pools( quotas, misc, } => { - let project_configs = project_configs - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - let cardinality = cardinality - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - let quotas = quotas - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - let misc = misc - .as_ref() - .map(|cfg| create_redis_pool(cfg, cpu_concurrency)) - .transpose()?; - - Ok(RedisPools { + let project_configs = create_redis_pool(project_configs, cpu_concurrency); + let cardinality = create_redis_pool(cardinality, cpu_concurrency); + let quotas = create_redis_pool(quotas, cpu_concurrency); + let misc = create_redis_pool(misc, cpu_concurrency); + + RedisPoolConfigs { project_configs, cardinality, quotas, misc, - }) + } } } } @@ -282,32 +274,41 @@ connection_timeout: 5 #[test] fn test_redis_individual() { let yaml = r#" -project_configs: +project_configs: server: "redis://127.0.0.1:6379" max_connections: 42 connection_timeout: 5 +cardinality: + server: "redis://127.0.0.1:6379" quotas: cluster_nodes: - "redis://127.0.0.1:6379" - "redis://127.0.0.2:6379" max_connections: 17 connection_timeout: 5 +misc: + cluster_nodes: + - "redis://127.0.0.1:6379" + - "redis://127.0.0.2:6379" "#; let configs: RedisConfigs = serde_yaml::from_str(yaml) .expect("Parsed processing redis configs: single with options"); let expected = RedisConfigs::Individual { - project_configs: Some(Box::new(RedisConfig { + project_configs: Box::new(RedisConfig { connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), options: PartialRedisConfigOptions { max_connections: Some(42), connection_timeout: 5, ..Default::default() }, - })), - cardinality: None, - quotas: Some(Box::new(RedisConfig { + }), + cardinality: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: Default::default(), + }), + quotas: Box::new(RedisConfig { connection: RedisConnection::Cluster(vec![ "redis://127.0.0.1:6379".to_owned(), "redis://127.0.0.2:6379".to_owned(), @@ -317,11 +318,17 @@ quotas: connection_timeout: 5, ..Default::default() }, - })), - misc: None, + }), + misc: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: Default::default(), + }), }; - assert_eq!(configs, expected,); + assert_eq!(configs, expected); } #[test] @@ -519,16 +526,19 @@ read_timeout: 10 #[test] fn test_redis_serialize_individual() { let configs = RedisConfigs::Individual { - project_configs: Some(Box::new(RedisConfig { + project_configs: Box::new(RedisConfig { connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), options: PartialRedisConfigOptions { max_connections: Some(42), connection_timeout: 5, ..Default::default() }, - })), - cardinality: None, - quotas: Some(Box::new(RedisConfig { + }), + cardinality: Box::new(RedisConfig { + connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), + options: Default::default(), + }), + quotas: Box::new(RedisConfig { connection: RedisConnection::Cluster(vec![ "redis://127.0.0.1:6379".to_owned(), "redis://127.0.0.2:6379".to_owned(), @@ -538,8 +548,14 @@ read_timeout: 10 connection_timeout: 5, ..Default::default() }, - })), - misc: None, + }), + misc: Box::new(RedisConfig { + connection: RedisConnection::Cluster(vec![ + "redis://127.0.0.1:6379".to_owned(), + "redis://127.0.0.2:6379".to_owned(), + ]), + options: Default::default(), + }), }; assert_json_snapshot!(configs, @r###" @@ -553,6 +569,14 @@ read_timeout: 10 "read_timeout": 3, "write_timeout": 3 }, + "cardinality": { + "server": "redis://127.0.0.1:6379", + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 + }, "quotas": { "cluster_nodes": [ "redis://127.0.0.1:6379", @@ -564,6 +588,17 @@ read_timeout: 10 "idle_timeout": 60, "read_timeout": 3, "write_timeout": 3 + }, + "misc": { + "cluster_nodes": [ + "redis://127.0.0.1:6379", + "redis://127.0.0.2:6379" + ], + "connection_timeout": 5, + "max_lifetime": 300, + "idle_timeout": 60, + "read_timeout": 3, + "write_timeout": 3 } } "###); diff --git a/relay-redis/src/noop.rs b/relay-redis/src/noop.rs index b3cf26011a..db08723155 100644 --- a/relay-redis/src/noop.rs +++ b/relay-redis/src/noop.rs @@ -31,14 +31,14 @@ impl RedisPool { } /// The various [`RedisPool`]s used within Relay. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct RedisPools { /// The pool used for project configurations - pub project_configs: Option, + pub project_configs: RedisPool, /// The pool used for cardinality limits. - pub cardinality: Option, + pub cardinality: RedisPool, /// The pool used for rate limiting/quotas. - pub quotas: Option, + pub quotas: RedisPool, /// The pool used for metrics metadata. - pub misc: Option, + pub misc: RedisPool, } diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index cd0ebab053..f5514131e7 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -209,16 +209,16 @@ impl RedisPool { } /// The various [`RedisPool`]s used within Relay. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct RedisPools { /// The pool used for project configurations - pub project_configs: Option, + pub project_configs: RedisPool, /// The pool used for cardinality limits. - pub cardinality: Option, + pub cardinality: RedisPool, /// The pool used for rate limiting/quotas. - pub quotas: Option, + pub quotas: RedisPool, /// The pool used for metrics metadata. - pub misc: Option, + pub misc: RedisPool, } /// Stats about how the [`RedisPool`] is performing. diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 98e1791740..2ead0b8f0e 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -10,7 +10,8 @@ use axum::extract::FromRequestParts; use axum::http::request::Parts; use rayon::ThreadPool; use relay_cogs::Cogs; -use relay_config::Config; +use relay_config::{Config, RedisConnection, RedisPoolConfigs}; +use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; @@ -133,7 +134,12 @@ impl ServiceState { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); - let redis_pools = config.redis().context(ServiceError::Redis)?; + let redis_pools = config + .redis() + .filter(|_| config.processing_enabled()) + .map(create_redis_pools) + .transpose() + .context(ServiceError::Redis)?; let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size())); @@ -227,7 +233,9 @@ impl ServiceState { buffer_guard.clone(), project_cache_services, metric_outcomes, - redis_pools.project_configs.clone(), + redis_pools + .as_ref() + .map(|pools| pools.project_configs.clone()), ) .spawn_handler(project_cache_rx); @@ -332,6 +340,32 @@ impl ServiceState { } } +fn create_redis_pool( + connection: &RedisConnection, + options: RedisConfigOptions, +) -> Result { + match connection { + RedisConnection::Cluster(servers) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) + } + RedisConnection::Single(server) => RedisPool::single(server, options), + } +} + +pub fn create_redis_pools(configs: RedisPoolConfigs) -> Result { + let project_configs = create_redis_pool(configs.project_configs.0, configs.project_configs.1)?; + let cardinality = create_redis_pool(configs.cardinality.0, configs.cardinality.1)?; + let quotas = create_redis_pool(configs.quotas.0, configs.quotas.1)?; + let misc = create_redis_pool(configs.misc.0, configs.misc.1)?; + + Ok(RedisPools { + project_configs, + cardinality, + quotas, + misc, + }) +} + #[axum::async_trait] impl FromRequestParts for ServiceState { type Rejection = Infallible; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index a82ab0699e..ddb4701f26 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1137,7 +1137,7 @@ impl EnvelopeProcessorService { config: Arc, global_config: GlobalConfigHandle, cogs: Cogs, - #[cfg(feature = "processing")] redis: RedisPools, + #[cfg(feature = "processing")] redis: Option, addrs: Addrs, metric_outcomes: MetricOutcomes, ) -> Self { @@ -1151,35 +1151,41 @@ impl EnvelopeProcessorService { } }); + #[cfg(feature = "processing")] + let (cardinality, quotas, misc) = match redis { + Some(RedisPools { + cardinality, + quotas, + misc, + .. + }) => (Some(cardinality), Some(quotas), Some(misc)), + None => (None, None, None), + }; + let inner = InnerProcessor { pool, global_config, cogs, #[cfg(feature = "processing")] // TODO: Tentatively using `quotas` for this - redis_pool: redis.quotas.clone(), + redis_pool: quotas.clone(), #[cfg(feature = "processing")] - rate_limiter: redis - .quotas - .clone() - .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), + rate_limiter: quotas.map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())), addrs, geoip_lookup, #[cfg(feature = "processing")] - metric_meta_store: redis.misc.clone().map(|pool| { - RedisMetricMetaStore::new(pool, config.metrics_meta_locations_expiry()) + metric_meta_store: misc.map(|misc| { + RedisMetricMetaStore::new(misc, config.metrics_meta_locations_expiry()) }), #[cfg(feature = "processing")] - cardinality_limiter: redis - .cardinality - .clone() - .map(|pool| { + cardinality_limiter: cardinality + .map(|cardinality| { RedisSetLimiter::new( RedisSetLimiterOptions { cache_vacuum_interval: config .cardinality_limiter_cache_vacuum_interval(), }, - pool, + cardinality, ) }) .map(CardinalityLimiter::new), diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index cb2460a7b7..8170a03c97 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -17,14 +17,14 @@ pub struct RelayStats { config: Arc, upstream_relay: Addr, #[cfg(feature = "processing")] - redis_pools: RedisPools, + redis_pools: Option, } impl RelayStats { pub fn new( config: Arc, upstream_relay: Addr, - #[cfg(feature = "processing")] redis_pools: RedisPools, + #[cfg(feature = "processing")] redis_pools: Option, ) -> Self { Self { config, @@ -118,19 +118,16 @@ impl RelayStats { #[cfg(feature = "processing")] async fn redis_pools(&self) { - if let Some(ref project_configs) = self.redis_pools.project_configs { + if let Some(RedisPools { + project_configs, + cardinality, + quotas, + misc, + }) = self.redis_pools.as_ref() + { Self::redis_pool(project_configs, "project_configs").await; - } - - if let Some(ref cardinality) = self.redis_pools.cardinality { Self::redis_pool(cardinality, "cardinality").await; - } - - if let Some(ref quotas) = self.redis_pools.quotas { Self::redis_pool(quotas, "quotas").await; - } - - if let Some(ref misc) = self.redis_pools.misc { Self::redis_pool(misc, "misc").await; } } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 1d034f0723..2f515f638f 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -15,6 +15,7 @@ use relay_test::mock_service; use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; +use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; @@ -120,7 +121,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] - let redis_pools = config.redis().unwrap(); + let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); @@ -149,7 +150,7 @@ pub fn create_test_processor_with_addrs( addrs: processor::Addrs, ) -> EnvelopeProcessorService { #[cfg(feature = "processing")] - let redis_pools = config.redis().unwrap(); + let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, addrs.outcome_aggregator.clone()); From 30e5214128ac37e34c5ccb9bdd0acec4c7fe259c Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 23 Jul 2024 12:05:55 +0200 Subject: [PATCH 13/21] Changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc75ed71e0..2f9734aa7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ **Features**: - "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825)) +- Redis pools for `project_configs`, `cardinality`, `quotas`, and `misc` usecases + can now be configured individually. ([#3843](https://github.com/getsentry/relay/pull/3843)) **Internal**: From 25f52a76131ec4c26e6720192525e828c0bf4bf2 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 23 Jul 2024 14:33:41 +0200 Subject: [PATCH 14/21] Make RelayStats::RedisPool not async --- relay-server/src/services/stats.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index 8170a03c97..f233998b09 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -101,7 +101,7 @@ impl RelayStats { } #[cfg(feature = "processing")] - async fn redis_pool(redis_pool: &RedisPool, name: &str) { + fn redis_pool(redis_pool: &RedisPool, name: &str) { let state = redis_pool.stats(); metric!( gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections), @@ -125,10 +125,10 @@ impl RelayStats { misc, }) = self.redis_pools.as_ref() { - Self::redis_pool(project_configs, "project_configs").await; - Self::redis_pool(cardinality, "cardinality").await; - Self::redis_pool(quotas, "quotas").await; - Self::redis_pool(misc, "misc").await; + Self::redis_pool(project_configs, "project_configs"); + Self::redis_pool(cardinality, "cardinality"); + Self::redis_pool(quotas, "quotas"); + Self::redis_pool(misc, "misc"); } } } From b9af5c07721d1bcdb2a0501bf3c141003b48ee0f Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 23 Jul 2024 14:36:36 +0200 Subject: [PATCH 15/21] Rename InnerProcessor::redis_pool to quotas_pool --- relay-server/src/services/processor.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index ddb4701f26..024034573d 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1118,7 +1118,7 @@ struct InnerProcessor { global_config: GlobalConfigHandle, cogs: Cogs, #[cfg(feature = "processing")] - redis_pool: Option, + quotas_pool: Option, addrs: Addrs, #[cfg(feature = "processing")] rate_limiter: Option, @@ -1167,8 +1167,7 @@ impl EnvelopeProcessorService { global_config, cogs, #[cfg(feature = "processing")] - // TODO: Tentatively using `quotas` for this - redis_pool: quotas.clone(), + quotas_pool: quotas.clone(), #[cfg(feature = "processing")] rate_limiter: quotas.map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())), addrs, @@ -1259,9 +1258,9 @@ impl EnvelopeProcessorService { #[allow(unused_mut)] let mut reservoir = ReservoirEvaluator::new(reservoir_counters); #[cfg(feature = "processing")] - if let Some(redis_pool) = self.inner.redis_pool.as_ref() { + if let Some(quotas_pool) = self.inner.quotas_pool.as_ref() { let org_id = managed_envelope.scoping().organization_id; - reservoir.set_redis(org_id, redis_pool); + reservoir.set_redis(org_id, quotas_pool); } let extracted_metrics = ProcessingExtractedMetrics::new( From 9cd9b0acf1a3d979de04d596a0e5dc8c7f8e3e60 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 23 Jul 2024 14:38:14 +0200 Subject: [PATCH 16/21] Tuple arguments on create_redis_pool --- relay-server/src/service.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 2ead0b8f0e..865caa1980 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -341,8 +341,8 @@ impl ServiceState { } fn create_redis_pool( - connection: &RedisConnection, - options: RedisConfigOptions, + (connection, options): (&RedisConnection, + RedisConfigOptions), ) -> Result { match connection { RedisConnection::Cluster(servers) => { @@ -353,10 +353,10 @@ fn create_redis_pool( } pub fn create_redis_pools(configs: RedisPoolConfigs) -> Result { - let project_configs = create_redis_pool(configs.project_configs.0, configs.project_configs.1)?; - let cardinality = create_redis_pool(configs.cardinality.0, configs.cardinality.1)?; - let quotas = create_redis_pool(configs.quotas.0, configs.quotas.1)?; - let misc = create_redis_pool(configs.misc.0, configs.misc.1)?; + let project_configs = create_redis_pool(configs.project_configs)?; + let cardinality = create_redis_pool(configs.cardinality)?; + let quotas = create_redis_pool(configs.quotas)?; + let misc = create_redis_pool(configs.misc)?; Ok(RedisPools { project_configs, From 346d3a6c33bf6f330763f133417338f2367433b6 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 23 Jul 2024 16:06:08 +0200 Subject: [PATCH 17/21] Refactor create_redis_pool(s) --- relay-config/src/redis.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 98e46317a6..3145fad697 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -164,14 +164,13 @@ pub struct RedisPoolConfigs<'a> { pub(super) fn create_redis_pool( config: &RedisConfig, - cpu_concurrency: usize, + default_connections: usize, ) -> (&RedisConnection, RedisConfigOptions) { let options = RedisConfigOptions { max_connections: config .options .max_connections - .unwrap_or(cpu_concurrency as u32 * 2) - .min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), + .unwrap_or(default_connections), connection_timeout: config.options.connection_timeout, max_lifetime: config.options.max_lifetime, idle_timeout: config.options.idle_timeout, @@ -188,7 +187,9 @@ pub(super) fn create_redis_pools( ) -> RedisPoolConfigs { match configs { RedisConfigs::Unified(cfg) => { - let pool = create_redis_pool(cfg, cpu_concurrency); + let default_connections = + (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS as usize); + let pool = create_redis_pool(cfg, default_connections); RedisPoolConfigs { project_configs: pool.clone(), cardinality: pool.clone(), @@ -202,11 +203,13 @@ pub(super) fn create_redis_pools( quotas, misc, } => { - let project_configs = create_redis_pool(project_configs, cpu_concurrency); + let project_configs = create_redis_pool( + project_configs, + (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS as usize), + ); let cardinality = create_redis_pool(cardinality, cpu_concurrency); let quotas = create_redis_pool(quotas, cpu_concurrency); let misc = create_redis_pool(misc, cpu_concurrency); - RedisPoolConfigs { project_configs, cardinality, From b6e0d02f5458e343b315b639c93dff45aaa795df Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 23 Jul 2024 16:23:08 +0200 Subject: [PATCH 18/21] Fix types --- relay-config/src/config.rs | 5 ++++- relay-config/src/redis.rs | 11 ++++------- relay-server/src/service.rs | 3 +-- relay-server/src/services/processor.rs | 3 ++- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 17cf96062f..292d318399 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -2283,7 +2283,10 @@ impl Config { pub fn redis(&self) -> Option { let redis_configs = self.values.processing.redis.as_ref()?; - Some(create_redis_pools(redis_configs, self.cpu_concurrency())) + Some(create_redis_pools( + redis_configs, + self.cpu_concurrency() as u32, + )) } /// Chunk size of attachments in bytes. diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 3145fad697..b5cc0ef9d2 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -164,7 +164,7 @@ pub struct RedisPoolConfigs<'a> { pub(super) fn create_redis_pool( config: &RedisConfig, - default_connections: usize, + default_connections: u32, ) -> (&RedisConnection, RedisConfigOptions) { let options = RedisConfigOptions { max_connections: config @@ -181,14 +181,11 @@ pub(super) fn create_redis_pool( (&config.connection, options) } -pub(super) fn create_redis_pools( - configs: &RedisConfigs, - cpu_concurrency: usize, -) -> RedisPoolConfigs { +pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs { match configs { RedisConfigs::Unified(cfg) => { let default_connections = - (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS as usize); + (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS); let pool = create_redis_pool(cfg, default_connections); RedisPoolConfigs { project_configs: pool.clone(), @@ -205,7 +202,7 @@ pub(super) fn create_redis_pools( } => { let project_configs = create_redis_pool( project_configs, - (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS as usize), + (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), ); let cardinality = create_redis_pool(cardinality, cpu_concurrency); let quotas = create_redis_pool(quotas, cpu_concurrency); diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index e20536a841..4b45fcdf67 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -363,8 +363,7 @@ impl ServiceState { } fn create_redis_pool( - (connection, options): (&RedisConnection, - RedisConfigOptions), + (connection, options): (&RedisConnection, RedisConfigOptions), ) -> Result { match connection { RedisConnection::Cluster(servers) => { diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 2430d4d0f3..5a63f07ec9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1169,7 +1169,8 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] quotas_pool: quotas.clone(), #[cfg(feature = "processing")] - rate_limiter: quotas.map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())), + rate_limiter: quotas + .map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())), addrs, geoip_lookup, #[cfg(feature = "processing")] From c64f9db1abd16015e9c33a27b1daa0e1c0006c04 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 23 Jul 2024 16:41:14 +0200 Subject: [PATCH 19/21] Fix error --- relay-server/src/services/project_cache.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 910606b265..ddc4b1734c 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -442,7 +442,9 @@ impl ProjectSource { UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); #[cfg(feature = "processing")] - let redis_maxconns = config.redis().map(|(_, config)| config.max_connections); + let redis_maxconns = config + .redis() + .map(|configs| configs.project_configs.1.max_connections); #[cfg(feature = "processing")] let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); From ffd0235c76c655e6d56ab7ac4bae0320152515b6 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 24 Jul 2024 10:01:24 +0200 Subject: [PATCH 20/21] Fix default connections --- relay-config/src/redis.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index b5cc0ef9d2..29fba9d0cf 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -182,11 +182,15 @@ pub(super) fn create_redis_pool( } pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs { + // Default `max_connections` for the `project_configs` pool. + // In a unified config, this is used for all pools. + let project_configs_default_connections = std::cmp::max( + cpu_concurrency * 2, + crate::redis::DEFAULT_MIN_MAX_CONNECTIONS, + ); match configs { RedisConfigs::Unified(cfg) => { - let default_connections = - (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS); - let pool = create_redis_pool(cfg, default_connections); + let pool = create_redis_pool(cfg, project_configs_default_connections); RedisPoolConfigs { project_configs: pool.clone(), cardinality: pool.clone(), @@ -200,10 +204,8 @@ pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) - quotas, misc, } => { - let project_configs = create_redis_pool( - project_configs, - (cpu_concurrency * 2).min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), - ); + let project_configs = + create_redis_pool(project_configs, project_configs_default_connections); let cardinality = create_redis_pool(cardinality, cpu_concurrency); let quotas = create_redis_pool(quotas, cpu_concurrency); let misc = create_redis_pool(misc, cpu_concurrency); From c985b37546858e26a78dfc97d4d1a77686a5ffe5 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 24 Jul 2024 10:26:06 +0200 Subject: [PATCH 21/21] Add a cfg annotation --- relay-server/src/testutils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 2f515f638f..f595b30096 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -15,6 +15,7 @@ use relay_test::mock_service; use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; +#[cfg(feature = "processing")] use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome;