diff --git a/tests/integration/asserts/time.py b/tests/integration/asserts/time.py index e432e8249f..a0f8ae19be 100644 --- a/tests/integration/asserts/time.py +++ b/tests/integration/asserts/time.py @@ -4,10 +4,8 @@ def _to_datetime(v): if isinstance(v, datetime): return v - elif isinstance(v, int): + elif isinstance(v, (int, float)): return datetime.fromtimestamp(v, timezone.utc) - elif isinstance(v, float): - return datetime.utcfromtimestamp(v) elif isinstance(v, str): return datetime.fromisoformat(v) else: diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 2c029e8a13..e49951bd3b 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -7,6 +7,7 @@ from datetime import UTC, datetime, timedelta, timezone from time import sleep +from .asserts import time_within_delta from .consts import ( TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION, @@ -598,11 +599,8 @@ def make_bucket(name, type_, values): ], ) - with pytest.raises( - AssertionError, match="MetricsConsumer: Expected 6 messages, only got 5" - ): - # We will fail if we get more buckets than metric_bucket_limit. - metrics_consumer.get_metrics(n=metric_bucket_limit + 1) + metrics_consumer.get_metrics(n=metric_bucket_limit) + metrics_consumer.assert_empty() @pytest.mark.parametrize("violating_bucket", [2.0, 3.0]) @@ -653,9 +651,11 @@ def test_rate_limit_metrics_buckets( } ] + now = datetime.now(tz=timezone.utc).timestamp() + def generate_ticks(): # Generate a new timestamp for every bucket, so they do not get merged by the aggregator - tick = int(datetime.now(UTC).timestamp() // bucket_interval * bucket_interval) + tick = int(now // bucket_interval * bucket_interval) while True: yield tick tick += bucket_interval @@ -673,47 +673,143 @@ def make_bucket(name, type_, values): "width": bucket_interval, } - def send_buckets(buckets): - relay.send_metrics_buckets(project_id, buckets) - sleep(0.2) + def assert_metrics(expected_metrics): + produced_metrics = [ + m for m, _ in metrics_consumer.get_metrics(n=len(expected_metrics)) + ] + produced_metrics.sort(key=lambda b: (b["name"], b["value"])) + assert produced_metrics == expected_metrics # NOTE: Sending these buckets in multiple envelopes because the order of flushing # and also the order of rate limiting is not deterministic. - send_buckets( + relay.send_metrics_buckets( + project_id, [ # Send a few non-duration buckets, they will not deplete the quota make_bucket("d:transactions/measurements.lcp@millisecond", "d", 10 * [1.0]), # Session metrics are accepted make_bucket("d:sessions/session@none", "c", 1), make_bucket("d:sessions/duration@second", "d", 9 * [1]), + ], + ) + assert_metrics( + [ + { + "name": "d:sessions/duration@second", + "org_id": 1, + "project_id": 42, + "retention_days": 90, + "tags": {}, + "type": "d", + "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + "timestamp": time_within_delta(now), + "received_at": time_within_delta(now), + }, + { + "name": "d:sessions/session@none", + "org_id": 1, + "retention_days": 90, + "project_id": 42, + "tags": {}, + "type": "c", + "value": 1.0, + "timestamp": time_within_delta(now), + "received_at": time_within_delta(now), + }, + { + "name": "d:transactions/measurements.lcp@millisecond", + "org_id": 1, + "retention_days": 90, + "project_id": 42, + "tags": {}, + "type": "d", + "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + "timestamp": time_within_delta(now), + "received_at": time_within_delta(now), + }, ] ) - send_buckets( + + relay.send_metrics_buckets( + project_id, [ # Duration metric, subtract 3 from quota make_bucket("c:transactions/usage@none", "c", 3), ], ) - send_buckets( + assert_metrics( + [ + { + "name": "c:transactions/usage@none", + "org_id": 1, + "retention_days": 90, + "project_id": 42, + "tags": {}, + "type": "c", + "value": 3.0, + "timestamp": time_within_delta(now), + "received_at": time_within_delta(now), + }, + ] + ) + + relay.send_metrics_buckets( + project_id, [ # Can still send unlimited non-duration metrics make_bucket("d:transactions/measurements.lcp@millisecond", "d", 10 * [2.0]), ], ) - send_buckets( + assert_metrics( + [ + { + "name": "d:transactions/measurements.lcp@millisecond", + "org_id": 1, + "retention_days": 90, + "project_id": 42, + "tags": {}, + "type": "d", + "value": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0], + "timestamp": time_within_delta(now), + "received_at": time_within_delta(now), + } + ] + ) + + relay.send_metrics_buckets( + project_id, [ # Usage metric, subtract from quota. This bucket is still accepted, but the rest # will be exceeded. make_bucket("c:transactions/usage@none", "c", violating_bucket), ], ) - send_buckets( + assert_metrics( [ - # FCP buckets won't make it into kakfa + { + "name": "c:transactions/usage@none", + "org_id": 1, + "retention_days": 90, + "project_id": 42, + "tags": {}, + "type": "c", + "value": violating_bucket, + "timestamp": time_within_delta(now), + "received_at": time_within_delta(now), + }, + ] + ) + + relay.send_metrics_buckets( + project_id, + [ + # FCP buckets won't make it into Kafka, since usage was now counted in Redis and it's >= 5. make_bucket("d:transactions/measurements.fcp@millisecond", "d", 10 * [7.0]), ], ) - send_buckets( + + relay.send_metrics_buckets( + project_id, [ # Another three for usage, won't make it into kafka. make_bucket("c:transactions/usage@none", "c", 3), @@ -721,80 +817,21 @@ def send_buckets(buckets): make_bucket("d:sessions/session@user", "s", [1254]), ], ) - - produced_buckets = [m for m, _ in metrics_consumer.get_metrics(timeout=10, n=7)] - - # Sort buckets to prevent ordering flakiness: - produced_buckets.sort(key=lambda b: (b["name"], b["value"])) - for bucket in produced_buckets: - del bucket["timestamp"] - del bucket["received_at"] - - assert produced_buckets == [ - { - "name": "c:transactions/usage@none", - "org_id": 1, - "retention_days": 90, - "project_id": 42, - "tags": {}, - "type": "c", - "value": violating_bucket, - }, - { - "name": "c:transactions/usage@none", - "org_id": 1, - "retention_days": 90, - "project_id": 42, - "tags": {}, - "type": "c", - "value": 3.0, - }, - { - "name": "d:sessions/duration@second", - "org_id": 1, - "project_id": 42, - "retention_days": 90, - "tags": {}, - "type": "d", - "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - }, - { - "name": "d:sessions/session@none", - "org_id": 1, - "retention_days": 90, - "project_id": 42, - "tags": {}, - "type": "c", - "value": 1.0, - }, - { - "name": "d:sessions/session@user", - "org_id": 1, - "retention_days": 90, - "project_id": 42, - "tags": {}, - "type": "s", - "value": [1254], - }, - { - "name": "d:transactions/measurements.lcp@millisecond", - "org_id": 1, - "retention_days": 90, - "project_id": 42, - "tags": {}, - "type": "d", - "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - }, - { - "name": "d:transactions/measurements.lcp@millisecond", - "org_id": 1, - "retention_days": 90, - "project_id": 42, - "tags": {}, - "type": "d", - "value": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0], - }, - ] + assert_metrics( + [ + { + "name": "d:sessions/session@user", + "org_id": 1, + "retention_days": 90, + "project_id": 42, + "tags": {}, + "type": "s", + "value": [1254], + "timestamp": time_within_delta(now), + "received_at": time_within_delta(now), + } + ] + ) outcomes_consumer.assert_rate_limited( reason_code,