diff --git a/Cargo.lock b/Cargo.lock index 359471c97..d12902418 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3827,7 +3827,7 @@ dependencies = [ [[package]] name = "server" -version = "0.2.19" +version = "0.2.20" dependencies = [ "aes-gcm", "anyhow", diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index 3ba1391a2..05cb8c2ea 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -245,7 +245,7 @@ async fn given_all_expired_messages_segment_should_be_expired() { let messages_count = 10; let now = IggyTimestamp::now().to_micros(); let message_expiry = message_expiry as u64; - let mut expired_timestamp = now - (1000 * 2 * message_expiry); + let mut expired_timestamp = now - (1000000 * 2 * message_expiry); let mut base_offset = 0; let mut last_timestamp = 0; let mut batch_buffer = BytesMut::new(); @@ -323,8 +323,8 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() .await; let now = IggyTimestamp::now().to_micros(); let message_expiry = message_expiry as u64; - let expired_timestamp = now - (1000 * 2 * message_expiry); - let not_expired_timestamp = now - (1000 * message_expiry) + 1; + let expired_timestamp = now - (1000000 * 2 * message_expiry); + let not_expired_timestamp = now - (1000000 * message_expiry) + 1; let expired_message = create_message(0, "test", expired_timestamp); let not_expired_message = create_message(1, "test", not_expired_timestamp); diff --git a/sdk/src/error.rs b/sdk/src/error.rs index d7fdabedc..2660ee00e 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -70,6 +70,8 @@ pub enum IggyError { InvalidPersonalAccessToken = 53, #[error("Personal access token: {0} for user with ID: {1} has expired.")] PersonalAccessTokenExpired(String, u32) = 54, + #[error("Users limit reached.")] + UsersLimitReached = 55, #[error("Not connected")] NotConnected = 61, #[error("Request error")] diff --git a/server/Cargo.toml b/server/Cargo.toml index 434e7bd81..a2366f2b4 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.2.19" +version = "0.2.20" edition = "2021" build = "src/build.rs" diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 19de899c2..641b499dc 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -133,7 +133,8 @@ impl Segment { } let last_message = &last_messages[0]; - let message_expiry = (self.message_expiry.unwrap() * 1000) as u64; + // Message expiry is in seconds, and timestamp is in microseconds + let message_expiry = (self.message_expiry.unwrap() * 1000000) as u64; (last_message.timestamp + message_expiry) <= now } diff --git a/server/src/streaming/systems/partitions.rs b/server/src/streaming/systems/partitions.rs index 392cbecce..c2ad53cbd 100644 --- a/server/src/streaming/systems/partitions.rs +++ b/server/src/streaming/systems/partitions.rs @@ -15,7 +15,7 @@ impl System { { let stream = self.get_stream(stream_id)?; let topic = stream.get_topic(topic_id)?; - self.permissioner.create_partitons( + self.permissioner.create_partitions( session.get_user_id(), stream.stream_id, topic.topic_id, diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index 4b280653e..cf44c6e83 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -261,7 +261,6 @@ impl System { .decrement_partitions(stream.get_partitions_count()); self.metrics.decrement_messages(stream.get_messages_count()); self.metrics.decrement_segments(stream.get_segments_count()); - self.streams.remove(&stream_id); self.streams_ids.remove(&stream_name); let current_stream_id = CURRENT_STREAM_ID.load(Ordering::SeqCst); diff --git a/server/src/streaming/systems/topics.rs b/server/src/streaming/systems/topics.rs index 9e5f3fd79..94f285394 100644 --- a/server/src/streaming/systems/topics.rs +++ b/server/src/streaming/systems/topics.rs @@ -134,6 +134,7 @@ impl System { .get_stream_mut(stream_id)? .delete_topic(topic_id) .await?; + self.metrics.decrement_topics(1); self.metrics .decrement_partitions(topic.get_partitions_count()); diff --git a/server/src/streaming/systems/users.rs b/server/src/streaming/systems/users.rs index 3c2c5f736..b8419d08a 100644 --- a/server/src/streaming/systems/users.rs +++ b/server/src/streaming/systems/users.rs @@ -14,6 +14,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use tracing::{error, info, warn}; static USER_ID: AtomicU32 = AtomicU32::new(1); +const MAX_USERS: usize = u32::MAX as usize; impl System { pub(crate) async fn load_users(&mut self) -> Result<(), IggyError> { @@ -31,6 +32,7 @@ impl System { let current_user_id = users.iter().map(|user| user.id).max().unwrap_or(1); USER_ID.store(current_user_id + 1, Ordering::SeqCst); self.permissioner.init(users); + self.metrics.increment_users(users_count as u32); info!("Initialized {} user(s).", users_count); Ok(()) } @@ -121,6 +123,12 @@ impl System { error!("User: {username} already exists."); return Err(IggyError::UserAlreadyExists); } + + if self.storage.user.load_all().await?.len() > MAX_USERS { + error!("Available users limit reached."); + return Err(IggyError::UsersLimitReached); + } + let user_id = USER_ID.fetch_add(1, Ordering::SeqCst); info!("Creating user: {username} with ID: {user_id}..."); let user = User::new(user_id, &username, password, status, permissions); diff --git a/server/src/streaming/users/permissioner_rules/partitions.rs b/server/src/streaming/users/permissioner_rules/partitions.rs index fad08d8e3..a5e0da935 100644 --- a/server/src/streaming/users/permissioner_rules/partitions.rs +++ b/server/src/streaming/users/permissioner_rules/partitions.rs @@ -2,7 +2,7 @@ use crate::streaming::users::permissioner::Permissioner; use iggy::error::IggyError; impl Permissioner { - pub fn create_partitons( + pub fn create_partitions( &self, user_id: u32, stream_id: u32,