Skip to content

Commit

Permalink
feat(subscriptions): Use received_p99 for scheduling subscriptions (#…
Browse files Browse the repository at this point in the history
…4922)

Errors and transactions have a "received" field. So let's use the p99 of that as the clock for subscription scheduling. This is a better timestamp to use than the Snuba one, as any backlogs in the ingest consumer will actually be taken into account in alerts. That is - if the ingest consumer is backlogging and messages delayed, subscriptions will not be run ahead of those messages being ingested.
  • Loading branch information
lynnagara authored Oct 26, 2023
1 parent 729cf29 commit 94c4701
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
4 changes: 2 additions & 2 deletions snuba/datasets/configuration/events/storages/errors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ stream_loader:
replacement_topic: event-replacements
commit_log_topic: snuba-commit-log
subscription_scheduler_mode: partition
subscription_synchronization_timestamp: orig_message_ts
subscription_synchronization_timestamp: received_p99
subscription_delay_seconds: 30
subscription_scheduled_topic: scheduled-subscriptions-events
subscription_result_topic: events-subscription-results
subscription_delay_seconds: 60
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ stream_loader:
default_topic: transactions
commit_log_topic: snuba-transactions-commit-log
subscription_scheduler_mode: global
subscription_synchronization_timestamp: orig_message_ts
subscription_synchronization_timestamp: received_p99
subscription_delay_seconds: 30
subscription_scheduled_topic: scheduled-subscriptions-transactions
subscription_result_topic: transactions-subscription-results
subscription_delay_seconds: 60
58 changes: 44 additions & 14 deletions tests/subscriptions/test_scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
scheduler._run_once()
scheduler._run_once()

epoch = datetime(1970, 1, 1)
epoch = 1000

producer = KafkaProducer(
build_kafka_producer_configuration(
stream_loader.get_default_topic_spec().topic,
)
)

for (partition, offset, orig_message_ts) in [
for (partition, offset, ts) in [
(0, 0, epoch),
(1, 0, epoch + timedelta(minutes=1)),
(0, 1, epoch + timedelta(minutes=2)),
(1, 1, epoch + timedelta(minutes=3)),
(1, 0, epoch + 60),
(0, 1, epoch + 120),
(1, 1, epoch + 180),
]:
fut = producer.produce(
commit_log_topic,
Expand All @@ -121,8 +121,8 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
"events",
Partition(commit_log_topic, partition),
offset,
orig_message_ts.timestamp(),
None,
ts,
ts,
)
),
)
Expand Down Expand Up @@ -180,7 +180,7 @@ def test_tick_consumer(time_shift: Optional[timedelta]) -> None:
Partition(topic, partition),
offset,
epoch.timestamp(),
None,
epoch.timestamp(),
)
)
producer.produce(Partition(topic, 0), payload).result()
Expand Down Expand Up @@ -328,7 +328,13 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
producer.produce(
partition,
commit_codec.encode(
Commit(followed_consumer_group, partition, 0, epoch.timestamp(), None)
Commit(
followed_consumer_group,
partition,
0,
epoch.timestamp(),
epoch.timestamp(),
)
),
).result()

Expand All @@ -337,7 +343,13 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
producer.produce(
partition,
commit_codec.encode(
Commit(followed_consumer_group, partition, 1, epoch.timestamp() + 1, None)
Commit(
followed_consumer_group,
partition,
1,
epoch.timestamp() + 1,
epoch.timestamp() + 1,
)
),
).result()

Expand All @@ -363,7 +375,13 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
producer.produce(
partition,
commit_codec.encode(
Commit(followed_consumer_group, partition, 2, epoch.timestamp(), None)
Commit(
followed_consumer_group,
partition,
2,
epoch.timestamp(),
epoch.timestamp(),
)
),
).result()

Expand All @@ -375,7 +393,13 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
producer.produce(
partition,
commit_codec.encode(
Commit(followed_consumer_group, partition, 3, epoch.timestamp() + 2, None)
Commit(
followed_consumer_group,
partition,
3,
epoch.timestamp() + 2,
epoch.timestamp() + 2,
)
),
).result()

Expand Down Expand Up @@ -445,15 +469,21 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
partition,
5,
now.timestamp(),
None,
now.timestamp(),
)
),
).result()

producer.produce(
partition,
commit_codec.encode(
Commit(followed_consumer_group, partition, 4, now.timestamp() - 2, None)
Commit(
followed_consumer_group,
partition,
4,
now.timestamp() - 2,
now.timestamp() - 2,
)
),
).result()

Expand Down

0 comments on commit 94c4701

Please sign in to comment.