diff --git a/requirements.txt b/requirements.txt index d45afa1c07..58c57916d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ pytest-watch==4.2.0 python-dateutil==2.8.2 python-rapidjson==1.8 redis==4.3.4 -sentry-arroyo==2.17.1 +sentry-arroyo==2.17.6 sentry-kafka-schemas==0.1.106 sentry-redis-tools==0.3.0 sentry-relay==0.8.44 diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 620e3027cb..b0b0025916 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -2852,8 +2852,8 @@ dependencies = [ [[package]] name = "rust_arroyo" -version = "2.17.4" -source = "git+https://github.com/getsentry/arroyo#b15d5467a4958b18ba50175e468e4b70535cb669" +version = "2.17.6" +source = "git+https://github.com/getsentry/arroyo#084c60732a11006a70e6ae56f0ae1cdf8a6dd7d8" dependencies = [ "chrono", "coarsetime", diff --git a/rust_snuba/bin/python_processor_infinite.rs b/rust_snuba/bin/python_processor_infinite.rs index 8aaf285836..19f1cf7908 100644 --- a/rust_snuba/bin/python_processor_infinite.rs +++ b/rust_snuba/bin/python_processor_infinite.rs @@ -20,9 +20,9 @@ fn main() { let output2 = output.clone(); let step = RunTask::new( - move |_| { + move |message| { output2.fetch_add(1, Ordering::Relaxed); - Ok(()) + Ok(message) }, step, ); diff --git a/snuba/subscriptions/scheduler_consumer.py b/snuba/subscriptions/scheduler_consumer.py index 794b3fe6c6..cda3021cbe 100644 --- a/snuba/subscriptions/scheduler_consumer.py +++ b/snuba/subscriptions/scheduler_consumer.py @@ -210,6 +210,10 @@ def close(self, timeout: Optional[float] = None) -> None: def closed(self) -> bool: return self.__consumer.closed + @property + def member_id(self) -> str: + return self.__consumer.member_id + class SchedulerBuilder: def __init__(