diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 71e3186f3f..320c5494ca 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -113,13 +113,16 @@ impl AggregatorService { config: AggregatorServiceConfig, receiver: Option>, ) -> Self { + let aggregator = aggregator::Aggregator::named(name, config.aggregator); Self { receiver, state: AggregatorState::Running, max_total_bucket_bytes: config.max_total_bucket_bytes, - aggregator: aggregator::Aggregator::named(name, config.aggregator), flush_interval_ms: config.flush_interval_ms, - can_accept_metrics: Arc::new(AtomicBool::new(true)), + can_accept_metrics: Arc::new(AtomicBool::new( + !aggregator.totals_cost_exceeded(config.max_total_bucket_bytes), + )), + aggregator, } } diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py index 7a1505b2ba..e2f9ee3ed0 100644 --- a/tests/integration/test_healthchecks.py +++ b/tests/integration/test_healthchecks.py @@ -123,6 +123,29 @@ def test_readiness_depends_on_aggregator_being_full(mini_sentry, relay): assert response.status_code == 503 +def test_readiness_depends_on_aggregator_being_full_after_metrics(mini_sentry, relay): + relay = relay( + mini_sentry, + {"aggregator": {"max_total_bucket_bytes": 1}}, + ) + + metrics_payload = "transactions/foo:42|c\ntransactions/bar:17|c" + relay.send_metrics(42, metrics_payload) + + for _ in range(100): + response = wait_get(relay, "/api/relay/healthcheck/ready/") + print(response, response.status_code) + if response.status_code == 503: + error = str(mini_sentry.test_failures.pop()) + assert "Health check probe 'aggregator'" in error + error = str(mini_sentry.test_failures.pop()) + assert "aggregator limit exceeded" in error + return + time.sleep(0.1) + + assert False, "health check never failed" + + def test_readiness_disk_spool(mini_sentry, relay): try: temp = tempfile.mkdtemp()