Skip to content

Commit

Permalink
Cleanup HTTP client in SDK, change logout path, fix group id name (#889)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Apr 10, 2024
1 parent 84af526 commit 331c8ab
Show file tree
Hide file tree
Showing 45 changed files with 356 additions and 899 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ impl IggyCmdTestCase for TestConsumerGroupGetCmd {
TestTopicId::Named => self.topic_name.clone(),
};

let consumer_group_id = match self.using_group_id {
let group_id = match self.using_group_id {
TestConsumerGroupId::Numeric => format!("{}", self.group_id),
TestConsumerGroupId::Named => self.group_name.clone(),
};

let start_message = format!(
"Executing get consumer group with ID: {} for topic with ID: {} and stream with ID: {}",
consumer_group_id, topic_id, stream_id
group_id, topic_id, stream_id
);

command_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async fn get_me_and_validate_consumer_groups(client: &IggyClient) -> ClientInfoD
let consumer_group = &client_info.consumer_groups[0];
assert_eq!(consumer_group.stream_id, STREAM_ID);
assert_eq!(consumer_group.topic_id, TOPIC_ID);
assert_eq!(consumer_group.consumer_group_id, CONSUMER_GROUP_ID);
assert_eq!(consumer_group.group_id, CONSUMER_GROUP_ID);

client_info
}
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/server/scenarios/system_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let consumer_group = &me.consumer_groups[0];
assert_eq!(consumer_group.stream_id, STREAM_ID);
assert_eq!(consumer_group.topic_id, TOPIC_ID);
assert_eq!(consumer_group.consumer_group_id, CONSUMER_GROUP_ID);
assert_eq!(consumer_group.group_id, CONSUMER_GROUP_ID);

leave_consumer_group(&client).await;

Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.4.0"
version = "0.4.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/binary/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
GetConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
consumer_group_id: group_id.clone(),
group_id: group_id.clone(),
}
.as_bytes(),
)
Expand Down Expand Up @@ -92,7 +92,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
DeleteConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
consumer_group_id: group_id.clone(),
group_id: group_id.clone(),
}
.as_bytes(),
)
Expand All @@ -112,7 +112,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
JoinConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
consumer_group_id: group_id.clone(),
group_id: group_id.clone(),
}
.as_bytes(),
)
Expand All @@ -132,7 +132,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
LeaveConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
consumer_group_id: group_id.clone(),
group_id: group_id.clone(),
}
.as_bytes(),
)
Expand Down
7 changes: 3 additions & 4 deletions sdk/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,18 @@ pub fn map_client(payload: Bytes) -> Result<ClientInfoDetails, IggyError> {
for _ in 0..client.consumer_groups_count {
let stream_id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let topic_id = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let consumer_group_id =
u32::from_le_bytes(payload[position + 8..position + 12].try_into()?);
let group_id = u32::from_le_bytes(payload[position + 8..position + 12].try_into()?);
let consumer_group = ConsumerGroupInfo {
stream_id,
topic_id,
consumer_group_id,
group_id,
};
consumer_groups.push(consumer_group);
position += 12;
}
}

consumer_groups.sort_by(|x, y| x.consumer_group_id.cmp(&y.consumer_group_id));
consumer_groups.sort_by(|x, y| x.group_id.cmp(&y.group_id));
let client = ClientInfoDetails {
client_id: client.client_id,
user_id: client.user_id,
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/cli/client/get_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl CliCommand for GetClientCmd {
consumer_groups.add_row(vec![
format!("{}", consumer_group.stream_id).as_str(),
format!("{}", consumer_group.topic_id).as_str(),
format!("{}", consumer_group.consumer_group_id).as_str(),
format!("{}", consumer_group.group_id).as_str(),
]);
}

Expand Down
12 changes: 6 additions & 6 deletions sdk/src/cli/consumer_group/delete_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ pub struct DeleteConsumerGroupCmd {
}

impl DeleteConsumerGroupCmd {
pub fn new(stream_id: Identifier, topic_id: Identifier, consumer_group_id: Identifier) -> Self {
pub fn new(stream_id: Identifier, topic_id: Identifier, group_id: Identifier) -> Self {
Self {
delete_consumer_group: DeleteConsumerGroup {
stream_id,
topic_id,
consumer_group_id,
group_id,
},
}
}
Expand All @@ -27,26 +27,26 @@ impl CliCommand for DeleteConsumerGroupCmd {
fn explain(&self) -> String {
format!(
"delete consumer group with ID: {} for topic with ID: {} and stream with ID: {}",
self.delete_consumer_group.consumer_group_id,
self.delete_consumer_group.group_id,
self.delete_consumer_group.topic_id,
self.delete_consumer_group.stream_id,
)
}

async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
client
.delete_consumer_group(&self.delete_consumer_group.stream_id, &self.delete_consumer_group.topic_id, &self.delete_consumer_group.consumer_group_id)
.delete_consumer_group(&self.delete_consumer_group.stream_id, &self.delete_consumer_group.topic_id, &self.delete_consumer_group.group_id)
.await
.with_context(|| {
format!(
"Problem deleting consumer group with ID: {} for topic with ID: {} and stream with ID: {}",
self.delete_consumer_group.consumer_group_id, self.delete_consumer_group.topic_id, self.delete_consumer_group.stream_id
self.delete_consumer_group.group_id, self.delete_consumer_group.topic_id, self.delete_consumer_group.stream_id
)
})?;

event!(target: PRINT_TARGET, Level::INFO,
"Consumer group with ID: {} deleted for topic with ID: {} and stream with ID: {}",
self.delete_consumer_group.consumer_group_id,
self.delete_consumer_group.group_id,
self.delete_consumer_group.topic_id,
self.delete_consumer_group.stream_id,
);
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/cli/consumer_group/get_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl GetConsumerGroupCmd {
get_consumer_group: GetConsumerGroup {
stream_id,
topic_id,
consumer_group_id,
group_id: consumer_group_id,
},
}
}
Expand All @@ -28,20 +28,20 @@ impl CliCommand for GetConsumerGroupCmd {
fn explain(&self) -> String {
format!(
"get consumer group with ID: {} for topic with ID: {} and stream with ID: {}",
self.get_consumer_group.consumer_group_id,
self.get_consumer_group.group_id,
self.get_consumer_group.topic_id,
self.get_consumer_group.stream_id,
)
}

async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
let consumer_group = client
.get_consumer_group(&self.get_consumer_group.stream_id, &self.get_consumer_group.topic_id, &self.get_consumer_group.consumer_group_id)
.get_consumer_group(&self.get_consumer_group.stream_id, &self.get_consumer_group.topic_id, &self.get_consumer_group.group_id)
.await
.with_context(|| {
format!(
"Problem getting consumer group with ID: {} for topic with ID: {} and stream with ID: {}",
self.get_consumer_group.consumer_group_id, self.get_consumer_group.topic_id, self.get_consumer_group.stream_id
self.get_consumer_group.group_id, self.get_consumer_group.topic_id, self.get_consumer_group.stream_id
)
})?;

Expand Down
8 changes: 4 additions & 4 deletions sdk/src/consumer_groups/create_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::str::from_utf8;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `consumer_group_id` - unique consumer group ID.
/// - `group_id` - unique consumer group ID.
/// - `name` - unique consumer group name, max length is 255 characters. The name will be always converted to lowercase and all whitespaces will be replaced with dots.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct CreateConsumerGroup {
Expand Down Expand Up @@ -152,15 +152,15 @@ mod tests {
fn should_be_deserialized_from_bytes() {
let stream_id = Identifier::numeric(1).unwrap();
let topic_id = Identifier::numeric(2).unwrap();
let consumer_group_id = 3u32;
let group_id = 3u32;
let name = "test".to_string();
let stream_id_bytes = stream_id.as_bytes();
let topic_id_bytes = topic_id.as_bytes();
let mut bytes =
BytesMut::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len() + name.len());
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
bytes.put_u32_le(consumer_group_id);
bytes.put_u32_le(group_id);
#[allow(clippy::cast_possible_truncation)]
bytes.put_u8(name.len() as u8);
bytes.put_slice(name.as_bytes());
Expand All @@ -170,7 +170,7 @@ mod tests {
let command = command.unwrap();
assert_eq!(command.stream_id, stream_id);
assert_eq!(command.topic_id, topic_id);
assert_eq!(command.group_id.unwrap(), consumer_group_id);
assert_eq!(command.group_id.unwrap(), group_id);
assert_eq!(command.name, name);
}
}
36 changes: 16 additions & 20 deletions sdk/src/consumer_groups/delete_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::fmt::Display;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `consumer_group_id` - unique consumer group ID (numeric or name).
/// - `group_id` - unique consumer group ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct DeleteConsumerGroup {
/// Unique stream ID (numeric or name).
Expand All @@ -22,7 +22,7 @@ pub struct DeleteConsumerGroup {
pub topic_id: Identifier,
/// Unique consumer group ID (numeric or name).
#[serde(skip)]
pub consumer_group_id: Identifier,
pub group_id: Identifier,
}

impl CommandPayload for DeleteConsumerGroup {}
Expand All @@ -37,13 +37,13 @@ impl BytesSerializable for DeleteConsumerGroup {
fn as_bytes(&self) -> Bytes {
let stream_id_bytes = self.stream_id.as_bytes();
let topic_id_bytes = self.topic_id.as_bytes();
let consumer_group_id_bytes = self.consumer_group_id.as_bytes();
let group_id_bytes = self.group_id.as_bytes();
let mut bytes = BytesMut::with_capacity(
stream_id_bytes.len() + topic_id_bytes.len() + consumer_group_id_bytes.len(),
stream_id_bytes.len() + topic_id_bytes.len() + group_id_bytes.len(),
);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
bytes.put_slice(&consumer_group_id_bytes);
bytes.put_slice(&group_id_bytes);
bytes.freeze()
}

Expand All @@ -57,11 +57,11 @@ impl BytesSerializable for DeleteConsumerGroup {
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes() as usize;
let consumer_group_id = Identifier::from_bytes(bytes.slice(position..))?;
let group_id = Identifier::from_bytes(bytes.slice(position..))?;
let command = DeleteConsumerGroup {
stream_id,
topic_id,
consumer_group_id,
group_id,
};
command.validate()?;
Ok(command)
Expand All @@ -70,11 +70,7 @@ impl BytesSerializable for DeleteConsumerGroup {

impl Display for DeleteConsumerGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}|{}|{}",
self.stream_id, self.topic_id, self.consumer_group_id
)
write!(f, "{}|{}|{}", self.stream_id, self.topic_id, self.group_id)
}
}

Expand All @@ -87,7 +83,7 @@ mod tests {
let command = DeleteConsumerGroup {
stream_id: Identifier::numeric(1).unwrap(),
topic_id: Identifier::numeric(2).unwrap(),
consumer_group_id: Identifier::numeric(3).unwrap(),
group_id: Identifier::numeric(3).unwrap(),
};

let bytes = command.as_bytes();
Expand All @@ -96,34 +92,34 @@ mod tests {
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
position += topic_id.get_size_bytes() as usize;
let consumer_group_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
let group_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();

assert!(!bytes.is_empty());
assert_eq!(stream_id, command.stream_id);
assert_eq!(topic_id, command.topic_id);
assert_eq!(consumer_group_id, command.consumer_group_id);
assert_eq!(group_id, command.group_id);
}

#[test]
fn should_be_deserialized_from_bytes() {
let stream_id = Identifier::numeric(1).unwrap();
let topic_id = Identifier::numeric(2).unwrap();
let consumer_group_id = Identifier::numeric(3).unwrap();
let group_id = Identifier::numeric(3).unwrap();
let stream_id_bytes = stream_id.as_bytes();
let topic_id_bytes = topic_id.as_bytes();
let consumer_group_id_bytes = consumer_group_id.as_bytes();
let group_id_bytes = group_id.as_bytes();
let mut bytes = BytesMut::with_capacity(
stream_id_bytes.len() + topic_id_bytes.len() + consumer_group_id_bytes.len(),
stream_id_bytes.len() + topic_id_bytes.len() + group_id_bytes.len(),
);
bytes.put_slice(&stream_id_bytes);
bytes.put_slice(&topic_id_bytes);
bytes.put_slice(&consumer_group_id_bytes);
bytes.put_slice(&group_id_bytes);
let command = DeleteConsumerGroup::from_bytes(bytes.freeze());
assert!(command.is_ok());

let command = command.unwrap();
assert_eq!(command.stream_id, stream_id);
assert_eq!(command.topic_id, topic_id);
assert_eq!(command.consumer_group_id, consumer_group_id);
assert_eq!(command.group_id, group_id);
}
}
Loading

0 comments on commit 331c8ab

Please sign in to comment.