Skip to content

Commit

Permalink
fix(tests): Fix flaky integration tests (#3820)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Jul 15, 2024
1 parent c89421a commit 3c87eaf
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 93 deletions.
4 changes: 1 addition & 3 deletions tests/integration/asserts/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
def _to_datetime(v):
if isinstance(v, datetime):
return v
elif isinstance(v, int):
elif isinstance(v, (int, float)):
return datetime.fromtimestamp(v, timezone.utc)
elif isinstance(v, float):
return datetime.utcfromtimestamp(v)
elif isinstance(v, str):
return datetime.fromisoformat(v)
else:
Expand Down
217 changes: 127 additions & 90 deletions tests/integration/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import UTC, datetime, timedelta, timezone
from time import sleep

from .asserts import time_within_delta
from .consts import (
TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION,
TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION,
Expand Down Expand Up @@ -598,11 +599,8 @@ def make_bucket(name, type_, values):
],
)

with pytest.raises(
AssertionError, match="MetricsConsumer: Expected 6 messages, only got 5"
):
# We will fail if we get more buckets than metric_bucket_limit.
metrics_consumer.get_metrics(n=metric_bucket_limit + 1)
metrics_consumer.get_metrics(n=metric_bucket_limit)
metrics_consumer.assert_empty()


@pytest.mark.parametrize("violating_bucket", [2.0, 3.0])
Expand Down Expand Up @@ -653,9 +651,11 @@ def test_rate_limit_metrics_buckets(
}
]

now = datetime.now(tz=timezone.utc).timestamp()

def generate_ticks():
# Generate a new timestamp for every bucket, so they do not get merged by the aggregator
tick = int(datetime.now(UTC).timestamp() // bucket_interval * bucket_interval)
tick = int(now // bucket_interval * bucket_interval)
while True:
yield tick
tick += bucket_interval
Expand All @@ -673,128 +673,165 @@ def make_bucket(name, type_, values):
"width": bucket_interval,
}

def send_buckets(buckets):
relay.send_metrics_buckets(project_id, buckets)
sleep(0.2)
def assert_metrics(expected_metrics):
produced_metrics = [
m for m, _ in metrics_consumer.get_metrics(n=len(expected_metrics))
]
produced_metrics.sort(key=lambda b: (b["name"], b["value"]))
assert produced_metrics == expected_metrics

# NOTE: Sending these buckets in multiple envelopes because the order of flushing
# and also the order of rate limiting is not deterministic.
send_buckets(
relay.send_metrics_buckets(
project_id,
[
# Send a few non-duration buckets, they will not deplete the quota
make_bucket("d:transactions/measurements.lcp@millisecond", "d", 10 * [1.0]),
# Session metrics are accepted
make_bucket("d:sessions/session@none", "c", 1),
make_bucket("d:sessions/duration@second", "d", 9 * [1]),
],
)
assert_metrics(
[
{
"name": "d:sessions/duration@second",
"org_id": 1,
"project_id": 42,
"retention_days": 90,
"tags": {},
"type": "d",
"value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
"timestamp": time_within_delta(now),
"received_at": time_within_delta(now),
},
{
"name": "d:sessions/session@none",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "c",
"value": 1.0,
"timestamp": time_within_delta(now),
"received_at": time_within_delta(now),
},
{
"name": "d:transactions/measurements.lcp@millisecond",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "d",
"value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
"timestamp": time_within_delta(now),
"received_at": time_within_delta(now),
},
]
)
send_buckets(

relay.send_metrics_buckets(
project_id,
[
# Duration metric, subtract 3 from quota
make_bucket("c:transactions/usage@none", "c", 3),
],
)
send_buckets(
assert_metrics(
[
{
"name": "c:transactions/usage@none",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "c",
"value": 3.0,
"timestamp": time_within_delta(now),
"received_at": time_within_delta(now),
},
]
)

relay.send_metrics_buckets(
project_id,
[
# Can still send unlimited non-duration metrics
make_bucket("d:transactions/measurements.lcp@millisecond", "d", 10 * [2.0]),
],
)
send_buckets(
assert_metrics(
[
{
"name": "d:transactions/measurements.lcp@millisecond",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "d",
"value": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0],
"timestamp": time_within_delta(now),
"received_at": time_within_delta(now),
}
]
)

relay.send_metrics_buckets(
project_id,
[
# Usage metric, subtract from quota. This bucket is still accepted, but the rest
# will be exceeded.
make_bucket("c:transactions/usage@none", "c", violating_bucket),
],
)
send_buckets(
assert_metrics(
[
# FCP buckets won't make it into kakfa
{
"name": "c:transactions/usage@none",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "c",
"value": violating_bucket,
"timestamp": time_within_delta(now),
"received_at": time_within_delta(now),
},
]
)

relay.send_metrics_buckets(
project_id,
[
# FCP buckets won't make it into Kafka, since usage was now counted in Redis and it's >= 5.
make_bucket("d:transactions/measurements.fcp@millisecond", "d", 10 * [7.0]),
],
)
send_buckets(

relay.send_metrics_buckets(
project_id,
[
# Another three for usage, won't make it into kafka.
make_bucket("c:transactions/usage@none", "c", 3),
# Session metrics are still accepted.
make_bucket("d:sessions/session@user", "s", [1254]),
],
)

produced_buckets = [m for m, _ in metrics_consumer.get_metrics(timeout=10, n=7)]

# Sort buckets to prevent ordering flakiness:
produced_buckets.sort(key=lambda b: (b["name"], b["value"]))
for bucket in produced_buckets:
del bucket["timestamp"]
del bucket["received_at"]

assert produced_buckets == [
{
"name": "c:transactions/usage@none",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "c",
"value": violating_bucket,
},
{
"name": "c:transactions/usage@none",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "c",
"value": 3.0,
},
{
"name": "d:sessions/duration@second",
"org_id": 1,
"project_id": 42,
"retention_days": 90,
"tags": {},
"type": "d",
"value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
},
{
"name": "d:sessions/session@none",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "c",
"value": 1.0,
},
{
"name": "d:sessions/session@user",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "s",
"value": [1254],
},
{
"name": "d:transactions/measurements.lcp@millisecond",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "d",
"value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
},
{
"name": "d:transactions/measurements.lcp@millisecond",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "d",
"value": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0],
},
]
assert_metrics(
[
{
"name": "d:sessions/session@user",
"org_id": 1,
"retention_days": 90,
"project_id": 42,
"tags": {},
"type": "s",
"value": [1254],
"timestamp": time_within_delta(now),
"received_at": time_within_delta(now),
}
]
)

outcomes_consumer.assert_rate_limited(
reason_code,
Expand Down

0 comments on commit 3c87eaf

Please sign in to comment.