Skip to content

Commit

Permalink
feat(memory): Implement shared memory state across Relay (#3821)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Jul 22, 2024
1 parent 9be7922 commit ebf3351
Show file tree
Hide file tree
Showing 24 changed files with 521 additions and 807 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Internal**:

- Use a dedicated thread pool for CPU intensive workloads. ([#3833](https://github.com/getsentry/relay/pull/3833))
- Remove `BufferGuard` in favor of memory checks via `MemoryStat`. ([#3821](https://github.com/getsentry/relay/pull/3821))

## 24.7.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ anyhow = "1.0.66"
axum = "0.6.20"
axum-extra = "0.7.7"
axum-server = "0.4.7"
arc-swap = "1.7.1"
backoff = "0.4.0"
bindgen = "0.64.0"
brotli = "3.3.4"
Expand Down
14 changes: 13 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ pub struct Health {
pub refresh_interval_ms: u64,
/// Maximum memory watermark in bytes.
///
/// By default there is no absolute limit set and the watermark
/// By default, there is no absolute limit set and the watermark
/// is only controlled by setting [`Self::max_memory_percent`].
pub max_memory_bytes: Option<ByteSize>,
/// Maximum memory watermark as a percentage of maximum system memory.
Expand All @@ -1364,6 +1364,12 @@ pub struct Health {
///
/// Defaults to 900 milliseconds.
pub probe_timeout_ms: u64,
/// The refresh frequency of memory stats which are used to poll memory
/// usage of Relay.
///
/// The implementation of memory stats guarantees that the refresh will happen at
/// least every `x` ms since memory readings are lazy and are updated only if needed.
pub memory_stat_refresh_frequency_ms: u64,
}

impl Default for Health {
Expand All @@ -1373,6 +1379,7 @@ impl Default for Health {
max_memory_bytes: None,
max_memory_percent: 0.95,
probe_timeout_ms: 900,
memory_stat_refresh_frequency_ms: 100,
}
}
}
Expand Down Expand Up @@ -2346,6 +2353,11 @@ impl Config {
Duration::from_millis(self.values.health.probe_timeout_ms)
}

/// Refresh frequency for polling new memory stats.
pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
self.values.health.memory_stat_refresh_frequency_ms
}

/// Whether COGS measurements are enabled.
pub fn cogs_enabled(&self) -> bool {
self.values.cogs.enabled
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ axum = { workspace = true, features = [
"tracing",
] }
axum-server = { workspace = true }
arc-swap = { workspace = true }
backoff = { workspace = true }
brotli = { workspace = true }
bytes = { workspace = true, features = ["serde"] }
Expand Down
51 changes: 25 additions & 26 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup};
use crate::services::project_cache::{CheckEnvelope, ValidateEnvelope};
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{
self, ApiErrorResponse, BufferError, BufferGuard, FormDataIter, ManagedEnvelope, MultipartError,
};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope, MultipartError};

#[derive(Clone, Copy, Debug, thiserror::Error)]
#[error("the service is overloaded")]
Expand Down Expand Up @@ -75,7 +73,7 @@ pub enum BadStoreRequest {
InvalidEventId,

#[error("failed to queue envelope")]
QueueFailed(#[from] BufferError),
QueueFailed,

#[error(
"envelope exceeded size limits for type '{0}' (https://develop.sentry.dev/sdk/envelopes/#size-limits)"
Expand Down Expand Up @@ -114,7 +112,7 @@ impl IntoResponse for BadStoreRequest {

(StatusCode::TOO_MANY_REQUESTS, headers, body).into_response()
}
BadStoreRequest::ScheduleFailed | BadStoreRequest::QueueFailed(_) => {
BadStoreRequest::ScheduleFailed | BadStoreRequest::QueueFailed => {
// These errors indicate that something's wrong with our service system, most likely
// mailbox congestion or a faulty shutdown. Indicate an unavailable service to the
// client. It might retry event submission at a later time.
Expand Down Expand Up @@ -264,7 +262,6 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
fn queue_envelope(
state: &ServiceState,
mut managed_envelope: ManagedEnvelope,
buffer_guard: &BufferGuard,
) -> Result<(), BadStoreRequest> {
let envelope = managed_envelope.envelope_mut();

Expand Down Expand Up @@ -299,14 +296,13 @@ fn queue_envelope(
// Split off the envelopes by item type.
let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope());
for (group, envelope) in envelopes {
let envelope = buffer_guard
.enter(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
)
.map_err(BadStoreRequest::QueueFailed)?;
let envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
);

state.project_cache().send(ValidateEnvelope::new(envelope));
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
Expand Down Expand Up @@ -335,17 +331,20 @@ pub async fn handle_envelope(
)
}

let buffer_guard = state.buffer_guard();
let mut managed_envelope = buffer_guard
.enter(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
// It's not clear at this point which group this envelope belongs to.
// The decission will be made while queueing in `queue_envelope` function.
ProcessingGroup::Ungrouped,
)
.map_err(BadStoreRequest::QueueFailed)?;
if state.memory_checker().check_memory().is_exceeded() {
// NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead.
// This will be fixed with the new spool implementation.
return Err(BadStoreRequest::QueueFailed);
};

let mut managed_envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
// It's not clear at this point which group this envelope belongs to.
// The decision will be made while queueing in `queue_envelope` function.
ProcessingGroup::Ungrouped,
);

// If configured, remove unknown items at the very beginning. If the envelope is
// empty, we fail the request with a special control flow error to skip checks and
Expand Down Expand Up @@ -377,7 +376,7 @@ pub async fn handle_envelope(
return Err(BadStoreRequest::Overflow(offender));
}

queue_envelope(state, managed_envelope, buffer_guard)?;
queue_envelope(state, managed_envelope)?;

if checked.rate_limits.is_limited() {
// Even if some envelope items have been queued, there might be active rate limits on
Expand Down
26 changes: 14 additions & 12 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::store::StoreService;
use crate::services::test_store::{TestStore, TestStoreService};
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::BufferGuard;
use crate::utils::{MemoryChecker, MemoryStat};

/// Indicates the type of failure of the server.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
Expand Down Expand Up @@ -136,7 +136,7 @@ fn create_store_pool(config: &Config) -> Result<ThreadPool> {
#[derive(Debug)]
struct StateInner {
config: Arc<Config>,
buffer_guard: Arc<BufferGuard>,
memory_checker: MemoryChecker,
registry: Registry,
}

Expand Down Expand Up @@ -164,7 +164,9 @@ impl ServiceState {
.transpose()
.context(ServiceError::Redis)?;

let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size()));
// We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
// configuration object down the line.
let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());

// Create an address for the `EnvelopeProcessor`, which can be injected into the
// other services.
Expand Down Expand Up @@ -254,7 +256,7 @@ impl ServiceState {
);
ProjectCacheService::new(
config.clone(),
buffer_guard.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_services,
metric_outcomes,
redis_pool.clone(),
Expand All @@ -263,6 +265,7 @@ impl ServiceState {

let health_check = HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator.clone(),
upstream_relay.clone(),
project_cache.clone(),
Expand Down Expand Up @@ -293,8 +296,8 @@ impl ServiceState {
};

let state = StateInner {
buffer_guard,
config,
config: config.clone(),
memory_checker: MemoryChecker::new(memory_stat, config),
registry,
};

Expand All @@ -308,12 +311,11 @@ impl ServiceState {
&self.inner.config
}

/// Returns a reference to the guard of the envelope buffer.
///
/// This can be used to enter new envelopes into the processing queue and reserve a slot in the
/// buffer. See [`BufferGuard`] for more information.
pub fn buffer_guard(&self) -> &BufferGuard {
&self.inner.buffer_guard
/// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
/// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
/// thresholds set in the [`Config`].
pub fn memory_checker(&self) -> &MemoryChecker {
&self.inner.memory_checker
}

/// Returns the address of the [`ProjectCache`] service.
Expand Down
84 changes: 8 additions & 76 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::sync::Arc;

use relay_config::Config;
use relay_statsd::metric;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use std::future::Future;
use sysinfo::{MemoryRefreshKind, System};
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

use crate::services::metrics::{AcceptsMetrics, Aggregator};
use crate::services::project_cache::{ProjectCache, SpoolHealth};
use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
use crate::statsd::{RelayGauges, RelayTimers};
use crate::statsd::RelayTimers;
use crate::utils::{MemoryCheck, MemoryChecker};

/// Checks whether Relay is alive and healthy based on its variant.
#[derive(Clone, Copy, Debug, serde::Deserialize)]
Expand Down Expand Up @@ -84,10 +83,10 @@ impl StatusUpdate {
#[derive(Debug)]
pub struct HealthCheckService {
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
system: System,
}

impl HealthCheckService {
Expand All @@ -96,44 +95,22 @@ impl HealthCheckService {
/// The service does not run. To run the service, use [`start`](Self::start).
pub fn new(
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
) -> Self {
Self {
system: System::new(),
config,
memory_checker,
aggregator,
upstream_relay,
project_cache,
config,
}
}

fn system_memory_probe(&mut self) -> Status {
self.system
.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());

// Use the cgroup if available in case Relay is running in a container.
// TODO: once we measured the new rss metric, we will remove `rss` and just used cgroup.rss
// `used`.
let memory = match self.system.cgroup_limits() {
Some(cgroup) => Memory {
used: cgroup.total_memory.saturating_sub(cgroup.free_memory),
total: cgroup.total_memory,
rss: cgroup.rss,
},
None => Memory {
used: self.system.used_memory(),
total: self.system.total_memory(),
rss: self.system.used_memory(),
},
};

metric!(gauge(RelayGauges::SystemMemoryUsed) = memory.used);
metric!(gauge(RelayGauges::SystemMemoryTotal) = memory.total);
metric!(gauge(RelayGauges::SystemMemoryRss) = memory.rss);

if memory.used_percent() >= self.config.health_max_memory_watermark_percent() {
if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_percent() {
relay_log::error!(
"Not enough memory, {} / {} ({:.2}% >= {:.2}%)",
memory.used,
Expand All @@ -144,7 +121,7 @@ impl HealthCheckService {
return Status::Unhealthy;
}

if memory.used > self.config.health_max_memory_watermark_bytes() {
if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_bytes() {
relay_log::error!(
"Not enough memory, {} / {} ({} >= {})",
memory.used,
Expand Down Expand Up @@ -254,21 +231,6 @@ impl Service for HealthCheckService {
}
}

/// A memory measurement.
#[derive(Debug)]
struct Memory {
pub used: u64,
pub total: u64,
pub rss: u64,
}

impl Memory {
/// Amount of used RAM in percent `0.0` to `1.0`.
pub fn used_percent(&self) -> f32 {
(self.used as f32 / self.total as f32).clamp(0.0, 1.0)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -303,34 +265,4 @@ mod tests {
let s = [].into_iter().collect();
assert!(matches!(s, Status::Healthy));
}

#[test]
fn test_memory_used_percent_total_0() {
let memory = Memory {
used: 100,
total: 0,
rss: 0,
};
assert_eq!(memory.used_percent(), 1.0);
}

#[test]
fn test_memory_used_percent_zero() {
let memory = Memory {
used: 0,
total: 100,
rss: 0,
};
assert_eq!(memory.used_percent(), 0.0);
}

#[test]
fn test_memory_used_percent_half() {
let memory = Memory {
used: 50,
total: 100,
rss: 0,
};
assert_eq!(memory.used_percent(), 0.5);
}
}
Loading

0 comments on commit ebf3351

Please sign in to comment.