From b91b3e91fc14b9169a747a79722c017982202a61 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 24 Jul 2024 14:57:08 +0200 Subject: [PATCH] Revert "feat(redis): Allow configuring Redis pools individually (#3843)" This reverts commit 4594b395418e585d46df8a119e02e6e76b57f92a. --- CHANGELOG.md | 2 - relay-config/src/config.rs | 34 ++- relay-config/src/redis.rs | 338 +-------------------- relay-redis/src/noop.rs | 13 - relay-redis/src/real.rs | 13 - relay-server/src/service.rs | 46 +-- relay-server/src/services/processor.rs | 39 +-- relay-server/src/services/project_cache.rs | 4 +- relay-server/src/services/stats.rs | 46 +-- relay-server/src/testutils.rs | 34 ++- 10 files changed, 100 insertions(+), 469 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a9c604ac9..7d14ee3642 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,6 @@ - "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825)) - Derive span browser name from user agent. ([#3834](https://github.com/getsentry/relay/pull/3834)) -- Redis pools for `project_configs`, `cardinality`, `quotas`, and `misc` usecases - can now be configured individually. ([#3843](https://github.com/getsentry/relay/pull/3843)) **Internal**: diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 292d318399..e9a6864952 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -16,6 +16,7 @@ use relay_kafka::{ }; use relay_metrics::aggregator::{AggregatorConfig, FlushBatching}; use relay_metrics::MetricNamespace; +use relay_redis::RedisConfigOptions; use serde::de::{DeserializeOwned, Unexpected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; @@ -23,7 +24,7 @@ use uuid::Uuid; use crate::aggregator::{AggregatorServiceConfig, ScopedAggregatorConfig}; use crate::byte_size::ByteSize; use crate::upstream::UpstreamDescriptor; -use crate::{create_redis_pools, RedisConfig, RedisConfigs, RedisPoolConfigs}; +use crate::{RedisConfig, RedisConnection}; const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10; @@ -1023,7 +1024,7 @@ pub struct Processing { pub kafka_validate_topics: bool, /// Redis hosts to connect to for storing state for rate limits. #[serde(default)] - pub redis: Option, + pub redis: Option, /// Maximum chunk size of attachments for Kafka. #[serde(default = "default_chunk_size")] pub attachment_chunk_size: ByteSize, @@ -1584,7 +1585,7 @@ impl Config { } if let Some(redis) = overrides.redis_url { - processing.redis = Some(RedisConfigs::Unified(RedisConfig::single(redis))) + processing.redis = Some(RedisConfig::single(redis)) } if let Some(kafka_url) = overrides.kafka_url { @@ -2278,15 +2279,26 @@ impl Config { &self.values.processing.topics.unused } - /// Redis servers to connect to for project configs, cardinality limits, - /// rate limiting, and metrics metadata. - pub fn redis(&self) -> Option { - let redis_configs = self.values.processing.redis.as_ref()?; + /// Redis servers to connect to, for rate limiting. + pub fn redis(&self) -> Option<(&RedisConnection, RedisConfigOptions)> { + let cpu_concurrency = self.cpu_concurrency(); - Some(create_redis_pools( - redis_configs, - self.cpu_concurrency() as u32, - )) + let redis = self.values.processing.redis.as_ref()?; + + let options = RedisConfigOptions { + max_connections: redis + .options + .max_connections + .unwrap_or(cpu_concurrency as u32 * 2) + .min(crate::redis::DEFAULT_MIN_MAX_CONNECTIONS), + connection_timeout: redis.options.connection_timeout, + max_lifetime: redis.options.max_lifetime, + idle_timeout: redis.options.idle_timeout, + read_timeout: redis.options.read_timeout, + write_timeout: redis.options.write_timeout, + }; + + Some((&redis.connection, options)) } /// Chunk size of attachments in bytes. diff --git a/relay-config/src/redis.rs b/relay-config/src/redis.rs index 29fba9d0cf..437d1e5b15 100644 --- a/relay-config/src/redis.rs +++ b/relay-config/src/redis.rs @@ -1,4 +1,3 @@ -use relay_redis::RedisConfigOptions; use serde::{Deserialize, Serialize}; /// For small setups, `2 x limits.max_thread_count` does not leave enough headroom. @@ -87,8 +86,7 @@ pub enum RedisConnection { } /// Configuration for connecting a redis client. -#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] -#[serde(from = "RedisConfigFromFile")] +#[derive(Clone, Debug, Serialize, Eq, PartialEq)] pub struct RedisConfig { /// Redis connection info. #[serde(flatten)] @@ -130,92 +128,12 @@ impl From for RedisConfig { } } -/// Configurations for the various Redis pools used by Relay. -#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] -#[serde(untagged)] -pub enum RedisConfigs { - /// All pools should be configured the same way. - Unified(RedisConfig), - /// Individual configurations for each pool. - Individual { - /// Configuration for the `project_configs` pool. - project_configs: Box, - /// Configuration for the `cardinality` pool. - cardinality: Box, - /// Configuration for the `quotas` pool. - quotas: Box, - /// Configuration for the `misc` pool. - misc: Box, - }, -} - -/// Helper struct bundling connections and options for the various Redis pools. -#[derive(Clone, Debug)] -pub struct RedisPoolConfigs<'a> { - /// Configuration for the `project_configs` pool. - pub project_configs: (&'a RedisConnection, RedisConfigOptions), - /// Configuration for the `cardinality` pool. - pub cardinality: (&'a RedisConnection, RedisConfigOptions), - /// Configuration for the `quotas` pool. - pub quotas: (&'a RedisConnection, RedisConfigOptions), - /// Configuration for the `misc` pool. - pub misc: (&'a RedisConnection, RedisConfigOptions), -} - -pub(super) fn create_redis_pool( - config: &RedisConfig, - default_connections: u32, -) -> (&RedisConnection, RedisConfigOptions) { - let options = RedisConfigOptions { - max_connections: config - .options - .max_connections - .unwrap_or(default_connections), - connection_timeout: config.options.connection_timeout, - max_lifetime: config.options.max_lifetime, - idle_timeout: config.options.idle_timeout, - read_timeout: config.options.read_timeout, - write_timeout: config.options.write_timeout, - }; - - (&config.connection, options) -} - -pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs { - // Default `max_connections` for the `project_configs` pool. - // In a unified config, this is used for all pools. - let project_configs_default_connections = std::cmp::max( - cpu_concurrency * 2, - crate::redis::DEFAULT_MIN_MAX_CONNECTIONS, - ); - match configs { - RedisConfigs::Unified(cfg) => { - let pool = create_redis_pool(cfg, project_configs_default_connections); - RedisPoolConfigs { - project_configs: pool.clone(), - cardinality: pool.clone(), - quotas: pool.clone(), - misc: pool, - } - } - RedisConfigs::Individual { - project_configs, - cardinality, - quotas, - misc, - } => { - let project_configs = - create_redis_pool(project_configs, project_configs_default_connections); - let cardinality = create_redis_pool(cardinality, cpu_concurrency); - let quotas = create_redis_pool(quotas, cpu_concurrency); - let misc = create_redis_pool(misc, cpu_concurrency); - RedisPoolConfigs { - project_configs, - cardinality, - quotas, - misc, - } - } +impl<'de> Deserialize<'de> for RedisConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + RedisConfigFromFile::deserialize(deserializer).map(Into::into) } } @@ -249,90 +167,6 @@ connection_timeout: 5 ); } - #[test] - fn test_redis_single_opts_unified() { - let yaml = r#" -server: "redis://127.0.0.1:6379" -max_connections: 42 -connection_timeout: 5 -"#; - - let config: RedisConfigs = serde_yaml::from_str(yaml) - .expect("Parsed processing redis config: single with options"); - - assert_eq!( - config, - RedisConfigs::Unified(RedisConfig { - connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), - options: PartialRedisConfigOptions { - max_connections: Some(42), - connection_timeout: 5, - ..Default::default() - } - }) - ); - } - - #[test] - fn test_redis_individual() { - let yaml = r#" -project_configs: - server: "redis://127.0.0.1:6379" - max_connections: 42 - connection_timeout: 5 -cardinality: - server: "redis://127.0.0.1:6379" -quotas: - cluster_nodes: - - "redis://127.0.0.1:6379" - - "redis://127.0.0.2:6379" - max_connections: 17 - connection_timeout: 5 -misc: - cluster_nodes: - - "redis://127.0.0.1:6379" - - "redis://127.0.0.2:6379" -"#; - - let configs: RedisConfigs = serde_yaml::from_str(yaml) - .expect("Parsed processing redis configs: single with options"); - - let expected = RedisConfigs::Individual { - project_configs: Box::new(RedisConfig { - connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), - options: PartialRedisConfigOptions { - max_connections: Some(42), - connection_timeout: 5, - ..Default::default() - }, - }), - cardinality: Box::new(RedisConfig { - connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), - options: Default::default(), - }), - quotas: Box::new(RedisConfig { - connection: RedisConnection::Cluster(vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned(), - ]), - options: PartialRedisConfigOptions { - max_connections: Some(17), - connection_timeout: 5, - ..Default::default() - }, - }), - misc: Box::new(RedisConfig { - connection: RedisConnection::Cluster(vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned(), - ]), - options: Default::default(), - }), - }; - - assert_eq!(configs, expected); - } - #[test] fn test_redis_single_serialize() { let config = RedisConfig { @@ -355,28 +189,6 @@ misc: "###); } - #[test] - fn test_redis_single_serialize_unified() { - let configs = RedisConfigs::Unified(RedisConfig { - connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), - options: PartialRedisConfigOptions { - connection_timeout: 5, - ..Default::default() - }, - }); - - assert_json_snapshot!(configs, @r###" - { - "server": "redis://127.0.0.1:6379", - "connection_timeout": 5, - "max_lifetime": 300, - "idle_timeout": 60, - "read_timeout": 3, - "write_timeout": 3 - } - "###); - } - #[test] fn test_redis_single_opts_default() { let yaml = r#" @@ -442,33 +254,6 @@ read_timeout: 10 ); } - #[test] - fn test_redis_cluster_nodes_opts_unified() { - let yaml = r#" -cluster_nodes: - - "redis://127.0.0.1:6379" - - "redis://127.0.0.2:6379" -read_timeout: 10 -"#; - - let config: RedisConfigs = serde_yaml::from_str(yaml) - .expect("Parsed processing redis config: single with options"); - - assert_eq!( - config, - RedisConfigs::Unified(RedisConfig { - connection: RedisConnection::Cluster(vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned() - ]), - options: PartialRedisConfigOptions { - read_timeout: 10, - ..Default::default() - }, - }) - ); - } - #[test] fn test_redis_cluster_serialize() { let config = RedisConfig { @@ -496,113 +281,4 @@ read_timeout: 10 } "###); } - - #[test] - fn test_redis_cluster_serialize_unified() { - let configs = RedisConfigs::Unified(RedisConfig { - connection: RedisConnection::Cluster(vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned(), - ]), - options: PartialRedisConfigOptions { - read_timeout: 33, - ..Default::default() - }, - }); - - assert_json_snapshot!(configs, @r###" - { - "cluster_nodes": [ - "redis://127.0.0.1:6379", - "redis://127.0.0.2:6379" - ], - "connection_timeout": 5, - "max_lifetime": 300, - "idle_timeout": 60, - "read_timeout": 33, - "write_timeout": 3 - } - "###); - } - - #[test] - fn test_redis_serialize_individual() { - let configs = RedisConfigs::Individual { - project_configs: Box::new(RedisConfig { - connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), - options: PartialRedisConfigOptions { - max_connections: Some(42), - connection_timeout: 5, - ..Default::default() - }, - }), - cardinality: Box::new(RedisConfig { - connection: RedisConnection::Single("redis://127.0.0.1:6379".to_owned()), - options: Default::default(), - }), - quotas: Box::new(RedisConfig { - connection: RedisConnection::Cluster(vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned(), - ]), - options: PartialRedisConfigOptions { - max_connections: Some(17), - connection_timeout: 5, - ..Default::default() - }, - }), - misc: Box::new(RedisConfig { - connection: RedisConnection::Cluster(vec![ - "redis://127.0.0.1:6379".to_owned(), - "redis://127.0.0.2:6379".to_owned(), - ]), - options: Default::default(), - }), - }; - - assert_json_snapshot!(configs, @r###" - { - "project_configs": { - "server": "redis://127.0.0.1:6379", - "max_connections": 42, - "connection_timeout": 5, - "max_lifetime": 300, - "idle_timeout": 60, - "read_timeout": 3, - "write_timeout": 3 - }, - "cardinality": { - "server": "redis://127.0.0.1:6379", - "connection_timeout": 5, - "max_lifetime": 300, - "idle_timeout": 60, - "read_timeout": 3, - "write_timeout": 3 - }, - "quotas": { - "cluster_nodes": [ - "redis://127.0.0.1:6379", - "redis://127.0.0.2:6379" - ], - "max_connections": 17, - "connection_timeout": 5, - "max_lifetime": 300, - "idle_timeout": 60, - "read_timeout": 3, - "write_timeout": 3 - }, - "misc": { - "cluster_nodes": [ - "redis://127.0.0.1:6379", - "redis://127.0.0.2:6379" - ], - "connection_timeout": 5, - "max_lifetime": 300, - "idle_timeout": 60, - "read_timeout": 3, - "write_timeout": 3 - } - } - "###); - } } diff --git a/relay-redis/src/noop.rs b/relay-redis/src/noop.rs index db08723155..0bab77bdcf 100644 --- a/relay-redis/src/noop.rs +++ b/relay-redis/src/noop.rs @@ -29,16 +29,3 @@ impl RedisPool { Ok(Self) } } - -/// The various [`RedisPool`]s used within Relay. -#[derive(Debug, Clone)] -pub struct RedisPools { - /// The pool used for project configurations - pub project_configs: RedisPool, - /// The pool used for cardinality limits. - pub cardinality: RedisPool, - /// The pool used for rate limiting/quotas. - pub quotas: RedisPool, - /// The pool used for metrics metadata. - pub misc: RedisPool, -} diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index f5514131e7..6b4f129940 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -208,19 +208,6 @@ impl RedisPool { } } -/// The various [`RedisPool`]s used within Relay. -#[derive(Debug, Clone)] -pub struct RedisPools { - /// The pool used for project configurations - pub project_configs: RedisPool, - /// The pool used for cardinality limits. - pub cardinality: RedisPool, - /// The pool used for rate limiting/quotas. - pub quotas: RedisPool, - /// The pool used for metrics metadata. - pub misc: RedisPool, -} - /// Stats about how the [`RedisPool`] is performing. pub struct Stats { /// The number of connections currently being managed by the pool. diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 4b45fcdf67..4f40ce73d0 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -10,8 +10,8 @@ use axum::extract::FromRequestParts; use axum::http::request::Parts; use rayon::ThreadPool; use relay_cogs::Cogs; -use relay_config::{Config, RedisConnection, RedisPoolConfigs}; -use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; +use relay_config::{Config, RedisConnection}; +use relay_redis::RedisPool; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; @@ -153,10 +153,15 @@ impl ServiceState { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); - let redis_pools = config + let redis_pool = config .redis() .filter(|_| config.processing_enabled()) - .map(create_redis_pools) + .map(|redis| match redis { + (RedisConnection::Single(server), options) => RedisPool::single(server, options), + (RedisConnection::Cluster(servers), options) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) + } + }) .transpose() .context(ServiceError::Redis)?; @@ -227,7 +232,7 @@ impl ServiceState { global_config_handle, cogs, #[cfg(feature = "processing")] - redis_pools.clone(), + redis_pool.clone(), processor::Addrs { project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), @@ -255,9 +260,7 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, metric_outcomes, - redis_pools - .as_ref() - .map(|pools| pools.project_configs.clone()), + redis_pool.clone(), ) .spawn_handler(project_cache_rx); @@ -274,7 +277,7 @@ impl ServiceState { config.clone(), upstream_relay.clone(), #[cfg(feature = "processing")] - redis_pools.clone(), + redis_pool, ) .start(); @@ -362,31 +365,6 @@ impl ServiceState { } } -fn create_redis_pool( - (connection, options): (&RedisConnection, RedisConfigOptions), -) -> Result { - match connection { - RedisConnection::Cluster(servers) => { - RedisPool::cluster(servers.iter().map(|s| s.as_str()), options) - } - RedisConnection::Single(server) => RedisPool::single(server, options), - } -} - -pub fn create_redis_pools(configs: RedisPoolConfigs) -> Result { - let project_configs = create_redis_pool(configs.project_configs)?; - let cardinality = create_redis_pool(configs.cardinality)?; - let quotas = create_redis_pool(configs.quotas)?; - let misc = create_redis_pool(configs.misc)?; - - Ok(RedisPools { - project_configs, - cardinality, - quotas, - misc, - }) -} - #[axum::async_trait] impl FromRequestParts for ServiceState { type Rejection = Infallible; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 273fafe69f..6059347562 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -58,7 +58,7 @@ use { relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups}, relay_metrics::RedisMetricMetaStore, relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter}, - relay_redis::{RedisPool, RedisPools}, + relay_redis::RedisPool, std::iter::Chain, std::slice::Iter, symbolic_unreal::{Unreal4Error, Unreal4ErrorKind}, @@ -1118,7 +1118,7 @@ struct InnerProcessor { global_config: GlobalConfigHandle, cogs: Cogs, #[cfg(feature = "processing")] - quotas_pool: Option, + redis_pool: Option, addrs: Addrs, #[cfg(feature = "processing")] rate_limiter: Option, @@ -1137,7 +1137,7 @@ impl EnvelopeProcessorService { config: Arc, global_config: GlobalConfigHandle, cogs: Cogs, - #[cfg(feature = "processing")] redis: Option, + #[cfg(feature = "processing")] redis: Option, addrs: Addrs, metric_outcomes: MetricOutcomes, ) -> Self { @@ -1151,41 +1151,32 @@ impl EnvelopeProcessorService { } }); - #[cfg(feature = "processing")] - let (cardinality, quotas, misc) = match redis { - Some(RedisPools { - cardinality, - quotas, - misc, - .. - }) => (Some(cardinality), Some(quotas), Some(misc)), - None => (None, None, None), - }; - let inner = InnerProcessor { workers: WorkerGroup::new(pool), global_config, cogs, #[cfg(feature = "processing")] - quotas_pool: quotas.clone(), + redis_pool: redis.clone(), #[cfg(feature = "processing")] - rate_limiter: quotas - .map(|quotas| RedisRateLimiter::new(quotas).max_limit(config.max_rate_limit())), + rate_limiter: redis + .clone() + .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), addrs, geoip_lookup, #[cfg(feature = "processing")] - metric_meta_store: misc.map(|misc| { - RedisMetricMetaStore::new(misc, config.metrics_meta_locations_expiry()) + metric_meta_store: redis.clone().map(|pool| { + RedisMetricMetaStore::new(pool, config.metrics_meta_locations_expiry()) }), #[cfg(feature = "processing")] - cardinality_limiter: cardinality - .map(|cardinality| { + cardinality_limiter: redis + .clone() + .map(|pool| { RedisSetLimiter::new( RedisSetLimiterOptions { cache_vacuum_interval: config .cardinality_limiter_cache_vacuum_interval(), }, - cardinality, + pool, ) }) .map(CardinalityLimiter::new), @@ -1259,9 +1250,9 @@ impl EnvelopeProcessorService { #[allow(unused_mut)] let mut reservoir = ReservoirEvaluator::new(reservoir_counters); #[cfg(feature = "processing")] - if let Some(quotas_pool) = self.inner.quotas_pool.as_ref() { + if let Some(redis_pool) = self.inner.redis_pool.as_ref() { let org_id = managed_envelope.scoping().organization_id; - reservoir.set_redis(org_id, quotas_pool); + reservoir.set_redis(org_id, redis_pool); } let extracted_metrics = ProcessingExtractedMetrics::new( diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 724642396a..dd7440dece 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -439,9 +439,7 @@ impl ProjectSource { UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); #[cfg(feature = "processing")] - let redis_maxconns = config - .redis() - .map(|configs| configs.project_configs.1.max_connections); + let redis_maxconns = config.redis().map(|(_, config)| config.max_connections); #[cfg(feature = "processing")] let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index f233998b09..a2a1a960d7 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; #[cfg(feature = "processing")] -use relay_redis::{RedisPool, RedisPools}; +use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, Service}; use tokio::time::interval; @@ -17,20 +17,20 @@ pub struct RelayStats { config: Arc, upstream_relay: Addr, #[cfg(feature = "processing")] - redis_pools: Option, + redis_pool: Option, } impl RelayStats { pub fn new( config: Arc, upstream_relay: Addr, - #[cfg(feature = "processing")] redis_pools: Option, + #[cfg(feature = "processing")] redis_pool: Option, ) -> Self { Self { config, upstream_relay, #[cfg(feature = "processing")] - redis_pools, + redis_pool, } } @@ -100,36 +100,18 @@ impl RelayStats { } } - #[cfg(feature = "processing")] - fn redis_pool(redis_pool: &RedisPool, name: &str) { - let state = redis_pool.stats(); - metric!( - gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections), - pool = name - ); - metric!( - gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections), - pool = name - ); - } - #[cfg(not(feature = "processing"))] - async fn redis_pools(&self) {} + async fn redis_pool(&self) {} #[cfg(feature = "processing")] - async fn redis_pools(&self) { - if let Some(RedisPools { - project_configs, - cardinality, - quotas, - misc, - }) = self.redis_pools.as_ref() - { - Self::redis_pool(project_configs, "project_configs"); - Self::redis_pool(cardinality, "cardinality"); - Self::redis_pool(quotas, "quotas"); - Self::redis_pool(misc, "misc"); - } + async fn redis_pool(&self) { + let Some(ref redis_pool) = self.redis_pool else { + return; + }; + + let state = redis_pool.stats(); + metric!(gauge(RelayGauges::RedisPoolConnections) = u64::from(state.connections)); + metric!(gauge(RelayGauges::RedisPoolIdleConnections) = u64::from(state.idle_connections)); } } @@ -146,7 +128,7 @@ impl Service for RelayStats { let _ = tokio::join!( self.upstream_status(), self.tokio_metrics(), - self.redis_pools(), + self.redis_pool(), ); ticker.tick().await; } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 715dbe8908..2a9703246f 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -12,11 +12,12 @@ use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use relay_system::Addr; use relay_test::mock_service; +#[cfg(feature = "processing")] +use {relay_config::RedisConnection, relay_redis::RedisPool}; + use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; -#[cfg(feature = "processing")] -use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; @@ -122,7 +123,17 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] - let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); + let redis = config + .redis() + .filter(|_| config.processing_enabled()) + .map(|redis| match redis { + (RedisConnection::Single(server), options) => { + RedisPool::single(server, options).unwrap() + } + (RedisConnection::Cluster(servers), options) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), options).unwrap() + } + }); let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); @@ -133,7 +144,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - redis_pools, + redis, processor::Addrs { outcome_aggregator, project_cache, @@ -151,7 +162,18 @@ pub fn create_test_processor_with_addrs( addrs: processor::Addrs, ) -> EnvelopeProcessorService { #[cfg(feature = "processing")] - let redis_pools = config.redis().map(create_redis_pools).transpose().unwrap(); + let redis = config + .redis() + .filter(|_| config.processing_enabled()) + .map(|redis| match redis { + (RedisConnection::Single(server), options) => { + RedisPool::single(server, options).unwrap() + } + (RedisConnection::Cluster(servers), options) => { + RedisPool::cluster(servers.iter().map(|s| s.as_str()), options).unwrap() + } + }); + let metric_outcomes = MetricOutcomes::new(MetricStats::test().0, addrs.outcome_aggregator.clone()); @@ -162,7 +184,7 @@ pub fn create_test_processor_with_addrs( GlobalConfigHandle::fixed(Default::default()), Cogs::noop(), #[cfg(feature = "processing")] - redis_pools, + redis, addrs, metric_outcomes, )