Skip to content

Commit

Permalink
ref(tokio): use a semaphore to limit redis blocking threads (#3841)
Browse files Browse the repository at this point in the history
SSIA

Follows #3840 

#skip-changelog
  • Loading branch information
gi0baro authored Jul 23, 2024
1 parent ab8210f commit 78050d3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
5 changes: 3 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ pub fn create_runtime(name: &str, threads: usize) -> Runtime {
// expands this pool very very aggressively and basically never shrinks it
// which leads to a massive resource waste.
.max_blocking_threads(150)
// As with the maximum amount of threads used by the runtime, we want
// to encourage the runtime to terminate blocking threads again.
// We also lower down the default (10s) keep alive timeout for blocking
// threads to encourage the runtime to not keep too many idle blocking threads
// around.
.thread_keep_alive(Duration::from_secs(1))
.enable_all()
.build()
Expand Down
12 changes: 12 additions & 0 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, Sender, Service};
use tokio::sync::mpsc;
#[cfg(feature = "processing")]
use tokio::sync::Semaphore;
use tokio::time::Instant;

use crate::services::global_config::{self, GlobalConfigManager, Subscribe};
Expand Down Expand Up @@ -424,6 +426,8 @@ struct ProjectSource {
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<RedisProjectSource>,
#[cfg(feature = "processing")]
redis_semaphore: Arc<Semaphore>,
}

impl ProjectSource {
Expand All @@ -437,6 +441,8 @@ impl ProjectSource {
let upstream_source =
UpstreamProjectSourceService::new(config.clone(), upstream_relay).start();

#[cfg(feature = "processing")]
let redis_maxconns = config.redis().map(|(_, config)| config.max_connections);
#[cfg(feature = "processing")]
let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool));

Expand All @@ -446,6 +452,10 @@ impl ProjectSource {
upstream_source,
#[cfg(feature = "processing")]
redis_source,
#[cfg(feature = "processing")]
redis_semaphore: Arc::new(Semaphore::new(
redis_maxconns.unwrap_or(10).try_into().unwrap(),
)),
}
}

Expand All @@ -469,10 +479,12 @@ impl ProjectSource {

#[cfg(feature = "processing")]
if let Some(redis_source) = self.redis_source {
let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?;
let state_fetch_result =
tokio::task::spawn_blocking(move || redis_source.get_config(project_key))
.await
.map_err(|_| ())?;
drop(redis_permit);

let state_opt = match state_fetch_result {
Ok(state) => state.map(ProjectState::sanitize).map(Arc::new),
Expand Down

0 comments on commit 78050d3

Please sign in to comment.