From 825f8d47d846a1fafff99c9c98c0b565939d8553 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Thu, 30 Nov 2023 14:25:00 +0100 Subject: [PATCH] fix(rust): Don't double-unsubscribe in LocalConsumer (#5122) --- .../rust_arroyo/src/backends/local/broker.rs | 11 ++++++++-- .../rust_arroyo/src/backends/local/mod.rs | 22 ++++++++++--------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/rust_snuba/rust_arroyo/src/backends/local/broker.rs b/rust_snuba/rust_arroyo/src/backends/local/broker.rs index 97c4b2b0b8..8c6b1b2858 100644 --- a/rust_snuba/rust_arroyo/src/backends/local/broker.rs +++ b/rust_snuba/rust_arroyo/src/backends/local/broker.rs @@ -122,8 +122,15 @@ impl LocalBroker { pub fn unsubscribe(&mut self, id: Uuid, group: String) -> Result, BrokerError> { let mut ret_partitions = Vec::new(); - let group_subscriptions = self.subscriptions.get_mut(&group).unwrap(); - let subscribed_topics = group_subscriptions.get(&id).unwrap(); + + let Some(group_subscriptions) = self.subscriptions.get_mut(&group) else { + return Ok(vec![]); + }; + + let Some(subscribed_topics) = group_subscriptions.get(&id) else { + return Ok(vec![]); + }; + for topic in subscribed_topics.iter() { let partitions = self.storage.partition_count(topic)?; for n in 0..partitions { diff --git a/rust_snuba/rust_arroyo/src/backends/local/mod.rs b/rust_snuba/rust_arroyo/src/backends/local/mod.rs index 8b1d9ee550..ad11d8bd26 100644 --- a/rust_snuba/rust_arroyo/src/backends/local/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/local/mod.rs @@ -246,16 +246,18 @@ impl Consumer } fn close(&mut self) { - let partitions = self - .broker - .unsubscribe(self.id, self.group.clone()) - .unwrap(); - if let Some(c) = self.subscription_state.callbacks.as_mut() { - let offset_stage = OffsetCommitter { - group: &self.group, - broker: &mut self.broker, - }; - c.on_revoke(offset_stage, partitions); + if !self.subscription_state.topics.is_empty() { + let partitions = self + .broker + .unsubscribe(self.id, self.group.clone()) + .unwrap(); + if let Some(c) = self.subscription_state.callbacks.as_mut() { + let offset_stage = OffsetCommitter { + group: &self.group, + broker: &mut self.broker, + }; + c.on_revoke(offset_stage, partitions); + } } self.closed = true; self.close_calls += 1;