From 4149a19f233ac111d9b398a702fd8ff75adf3925 Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Wed, 11 Sep 2024 13:57:02 +0200 Subject: [PATCH] Improve partition ID calculation (#1236) --- Cargo.lock | 2 +- server/Cargo.toml | 2 +- server/src/streaming/topics/consumer_group.rs | 13 +++++++++++-- server/src/tcp/tcp_listener.rs | 2 +- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85d9d0439..608da2b93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3838,7 +3838,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.40" +version = "0.4.41" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/server/Cargo.toml b/server/Cargo.toml index d78474941..7b237c68e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.40" +version = "0.4.41" edition = "2021" build = "src/build.rs" diff --git a/server/src/streaming/topics/consumer_group.rs b/server/src/streaming/topics/consumer_group.rs index 7bcc0b96c..45eff1d4a 100644 --- a/server/src/streaming/topics/consumer_group.rs +++ b/server/src/streaming/topics/consumer_group.rs @@ -131,9 +131,18 @@ impl ConsumerGroupMember { pub fn calculate_partition_id(&mut self) -> u32 { let partition_index = self.current_partition_index; - let partition_id = *self.partitions.get(&partition_index).unwrap(); + let partition_id = if let Some(partition_id) = self.partitions.get(&partition_index) { + *partition_id + } else { + trace!( + "No partition ID found for index: {} for member with ID: {}.", + partition_index, + self.id + ); + return 1; + }; self.current_partition_id = partition_id; - if self.partitions.len() == (partition_index + 1) as usize { + if self.partitions.len() <= (partition_index + 1) as usize { self.current_partition_index = 0; } else { self.current_partition_index += 1; diff --git a/server/src/tcp/tcp_listener.rs b/server/src/tcp/tcp_listener.rs index bb6e27d5c..a2981a986 100644 --- a/server/src/tcp/tcp_listener.rs +++ b/server/src/tcp/tcp_listener.rs @@ -14,7 +14,7 @@ pub async fn start(address: &str, system: SharedSystem) -> SocketAddr { tokio::spawn(async move { let listener = TcpListener::bind(&address) .await - .expect("Unable to start TCP TLS server."); + .expect("Unable to start TCP server."); let local_addr = listener .local_addr()