Skip to content

Commit

Permalink
fix(rust): Don't double-unsubscribe in LocalConsumer (#5122)
Browse files Browse the repository at this point in the history
  • Loading branch information
loewenheim authored Nov 30, 2023
1 parent 388639f commit 825f8d4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
11 changes: 9 additions & 2 deletions rust_snuba/rust_arroyo/src/backends/local/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,15 @@ impl<TPayload> LocalBroker<TPayload> {

pub fn unsubscribe(&mut self, id: Uuid, group: String) -> Result<Vec<Partition>, BrokerError> {
let mut ret_partitions = Vec::new();
let group_subscriptions = self.subscriptions.get_mut(&group).unwrap();
let subscribed_topics = group_subscriptions.get(&id).unwrap();

let Some(group_subscriptions) = self.subscriptions.get_mut(&group) else {
return Ok(vec![]);
};

let Some(subscribed_topics) = group_subscriptions.get(&id) else {
return Ok(vec![]);
};

for topic in subscribed_topics.iter() {
let partitions = self.storage.partition_count(topic)?;
for n in 0..partitions {
Expand Down
22 changes: 12 additions & 10 deletions rust_snuba/rust_arroyo/src/backends/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,18 @@ impl<TPayload: 'static, C: AssignmentCallbacks> Consumer<TPayload, C>
}

fn close(&mut self) {
let partitions = self
.broker
.unsubscribe(self.id, self.group.clone())
.unwrap();
if let Some(c) = self.subscription_state.callbacks.as_mut() {
let offset_stage = OffsetCommitter {
group: &self.group,
broker: &mut self.broker,
};
c.on_revoke(offset_stage, partitions);
if !self.subscription_state.topics.is_empty() {
let partitions = self
.broker
.unsubscribe(self.id, self.group.clone())
.unwrap();
if let Some(c) = self.subscription_state.callbacks.as_mut() {
let offset_stage = OffsetCommitter {
group: &self.group,
broker: &mut self.broker,
};
c.on_revoke(offset_stage, partitions);
}
}
self.closed = true;
self.close_calls += 1;
Expand Down

0 comments on commit 825f8d4

Please sign in to comment.