From 63b18c2658423969016f3038b0065634c97e8fc1 Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Wed, 4 Sep 2024 16:03:48 +0200 Subject: [PATCH] Cleanup connection handler, add session logs (#1216) --- Cargo.lock | 2 +- server/Cargo.toml | 2 +- server/src/quic/listener.rs | 12 ++--- .../src/streaming/clients/client_manager.rs | 15 ++---- server/src/streaming/systems/clients.rs | 53 ++++++------------- server/src/tcp/connection_handler.rs | 11 +--- server/src/tcp/tcp_listener.rs | 18 +++++-- server/src/tcp/tcp_tls_listener.rs | 17 ++++-- 8 files changed, 56 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b3edf2a99..b6b818ec1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3864,7 +3864,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.30" +version = "0.4.31" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/server/Cargo.toml b/server/Cargo.toml index bc7044b85..ade36dddd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.30" +version = "0.4.31" edition = "2021" build = "src/build.rs" diff --git a/server/src/quic/listener.rs b/server/src/quic/listener.rs index 16d15324a..f9d1517c6 100644 --- a/server/src/quic/listener.rs +++ b/server/src/quic/listener.rs @@ -1,6 +1,3 @@ -use std::net::SocketAddr; -use std::sync::Arc; - use crate::binary::command; use crate::command::ServerCommand; use crate::quic::quic_sender::QuicSender; @@ -13,6 +10,7 @@ use bytes::Bytes; use iggy::validatable::Validatable; use iggy::{bytes_serializable::BytesSerializable, messages::MAX_PAYLOAD_SIZE}; use quinn::{Connection, Endpoint, RecvStream, SendStream}; +use std::sync::Arc; use tracing::{debug, error, info}; const LISTENERS_COUNT: u32 = 10; @@ -62,7 +60,7 @@ async fn handle_connection( .await; let session = Arc::new(Session::from_client_id(client_id, address)); - while let Some(stream) = accept_stream(&connection, &system, &address).await? { + while let Some(stream) = accept_stream(&connection, &system, client_id).await? { let system = system.clone(); let session = session.clone(); @@ -81,17 +79,17 @@ type BiStream = (SendStream, RecvStream); async fn accept_stream( connection: &Connection, system: &SharedSystem, - address: &SocketAddr, + client_id: u32, ) -> Result, ServerError> { match connection.accept_bi().await { Err(quinn::ConnectionError::ApplicationClosed { .. }) => { info!("Connection closed"); - system.read().await.delete_client(address).await; + system.read().await.delete_client(client_id).await; Ok(None) } Err(error) => { error!("Error when handling QUIC stream: {:?}", error); - system.read().await.delete_client(address).await; + system.read().await.delete_client(client_id).await; Err(error.into()) } Ok(stream) => Ok(Some(stream)), diff --git a/server/src/streaming/clients/client_manager.rs b/server/src/streaming/clients/client_manager.rs index ee872ba27..9bf1dfb8d 100644 --- a/server/src/streaming/clients/client_manager.rs +++ b/server/src/streaming/clients/client_manager.rs @@ -80,15 +80,7 @@ impl ClientManager { Ok(()) } - pub fn get_client_by_address( - &self, - address: &SocketAddr, - ) -> Result, IggyError> { - let id = hash::calculate_32(address.to_string().as_bytes()); - self.get_client_by_id(id) - } - - pub fn get_client_by_id(&self, client_id: u32) -> Result, IggyError> { + pub fn get_client(&self, client_id: u32) -> Result, IggyError> { let client = self.clients.get(&client_id); if client.is_none() { return Err(IggyError::ClientNotFound(client_id)); @@ -119,9 +111,8 @@ impl ClientManager { Ok(()) } - pub fn delete_client(&mut self, address: &SocketAddr) -> Option> { - let id = hash::calculate_32(address.to_string().as_bytes()); - self.clients.remove(&id) + pub fn delete_client(&mut self, client_id: u32) -> Option> { + self.clients.remove(&client_id) } pub async fn join_consumer_group( diff --git a/server/src/streaming/systems/clients.rs b/server/src/streaming/systems/clients.rs index 7aac7105d..05cddc379 100644 --- a/server/src/streaming/systems/clients.rs +++ b/server/src/streaming/systems/clients.rs @@ -17,60 +17,41 @@ impl System { client_id } - pub async fn delete_client(&self, address: &SocketAddr) { + pub async fn delete_client(&self, client_id: u32) { let consumer_groups: Vec<(u32, u32, u32)>; - let client_id; { - let client_manager = self.client_manager.read().await; - let client = client_manager.get_client_by_address(address); - if client.is_err() { + let mut client_manager = self.client_manager.write().await; + let client = client_manager.delete_client(client_id); + if client.is_none() { + error!("Client with ID: {client_id} was not found in the client manager.",); return; } + self.metrics.decrement_clients(1); let client = client.unwrap(); let client = client.read().await; - client_id = client.client_id; - consumer_groups = client .consumer_groups .iter() .map(|c| (c.stream_id, c.topic_id, c.group_id)) .collect(); + + info!( + "Deleted {} client with ID: {} for IP address: {}", + client.transport, client.client_id, client.address + ); } - for (stream_id, topic_id, consumer_group_id) in consumer_groups.iter() { - if let Err(error) = self + for (stream_id, topic_id, consumer_group_id) in consumer_groups.into_iter() { + _ = self .leave_consumer_group_by_client( - &Identifier::numeric(*stream_id).unwrap(), - &Identifier::numeric(*topic_id).unwrap(), - &Identifier::numeric(*consumer_group_id).unwrap(), + &Identifier::numeric(stream_id).unwrap(), + &Identifier::numeric(topic_id).unwrap(), + &Identifier::numeric(consumer_group_id).unwrap(), client_id, ) .await - { - error!( - "Failed to leave consumer group with ID: {} by client with ID: {}. Error: {}", - consumer_group_id, client_id, error - ); - } - } - - { - let mut client_manager = self.client_manager.write().await; - let client = client_manager.delete_client(address); - if client.is_none() { - return; - } - - self.metrics.decrement_clients(1); - let client = client.unwrap(); - let client = client.read().await; - - info!( - "Deleted {} client with ID: {} for IP address: {}", - client.transport, client.client_id, client.address - ); } } @@ -82,7 +63,7 @@ impl System { self.ensure_authenticated(session)?; self.permissioner.get_client(session.get_user_id())?; let client_manager = self.client_manager.read().await; - client_manager.get_client_by_id(client_id) + client_manager.get_client(client_id) } pub async fn get_clients( diff --git a/server/src/tcp/connection_handler.rs b/server/src/tcp/connection_handler.rs index 3c36302a1..082ef1501 100644 --- a/server/src/tcp/connection_handler.rs +++ b/server/src/tcp/connection_handler.rs @@ -2,7 +2,6 @@ use crate::binary::command; use crate::binary::sender::Sender; use crate::command::ServerCommand; use crate::server_error::ServerError; -use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use bytes::{BufMut, BytesMut}; @@ -10,23 +9,15 @@ use iggy::bytes_serializable::BytesSerializable; use iggy::error::IggyError; use iggy::validatable::Validatable; use std::io::ErrorKind; -use std::net::SocketAddr; use tracing::{debug, error, info}; const INITIAL_BYTES_LENGTH: usize = 4; pub(crate) async fn handle_connection( - address: SocketAddr, + session: Session, sender: &mut dyn Sender, system: SharedSystem, ) -> Result<(), ServerError> { - let client_id = system - .read() - .await - .add_client(&address, Transport::Tcp) - .await; - - let session = Session::from_client_id(client_id, address); let mut initial_buffer = [0u8; INITIAL_BYTES_LENGTH]; loop { let read_length = match sender.read(&mut initial_buffer).await { diff --git a/server/src/tcp/tcp_listener.rs b/server/src/tcp/tcp_listener.rs index 01abaef01..bb6e27d5c 100644 --- a/server/src/tcp/tcp_listener.rs +++ b/server/src/tcp/tcp_listener.rs @@ -1,3 +1,5 @@ +use crate::streaming::clients::client_manager::Transport; +use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_sender::TcpSender; @@ -28,19 +30,27 @@ pub async fn start(address: &str, system: SharedSystem) -> SocketAddr { loop { match listener.accept().await { Ok((stream, address)) => { - info!("Accepted new TCP connection: {}", address); + info!("Accepted new TCP connection: {address}"); + let client_id = system + .read() + .await + .add_client(&address, Transport::Tcp) + .await; + + let session = Session::from_client_id(client_id, address); + info!("Created new session: {session}"); let system = system.clone(); let mut sender = TcpSender { stream }; tokio::spawn(async move { if let Err(error) = - handle_connection(address, &mut sender, system.clone()).await + handle_connection(session, &mut sender, system.clone()).await { handle_error(error); - system.read().await.delete_client(&address).await; + system.read().await.delete_client(client_id).await; } }); } - Err(error) => error!("Unable to accept TCP socket, error: {}", error), + Err(error) => error!("Unable to accept TCP socket, error: {error}"), } } }); diff --git a/server/src/tcp/tcp_tls_listener.rs b/server/src/tcp/tcp_tls_listener.rs index a3f3072b1..9e4714294 100644 --- a/server/src/tcp/tcp_tls_listener.rs +++ b/server/src/tcp/tcp_tls_listener.rs @@ -1,6 +1,8 @@ use std::net::SocketAddr; use crate::configs::tcp::TcpTlsConfig; +use crate::streaming::clients::client_manager::Transport; +use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_tls_sender::TcpTlsSender; @@ -49,20 +51,29 @@ pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSys match listener.accept().await { Ok((stream, address)) => { info!("Accepted new TCP TLS connection: {}", address); + let client_id = system + .read() + .await + .add_client(&address, Transport::Tcp) + .await; + + let session = Session::from_client_id(client_id, address); + info!("Created new session: {session}"); + let acceptor = acceptor.clone(); let stream = acceptor.accept(stream).await.unwrap(); let system = system.clone(); let mut sender = TcpTlsSender { stream }; tokio::spawn(async move { if let Err(error) = - handle_connection(address, &mut sender, system.clone()).await + handle_connection(session, &mut sender, system.clone()).await { handle_error(error); - system.read().await.delete_client(&address).await; + system.read().await.delete_client(client_id).await; } }); } - Err(error) => error!("Unable to accept TCP TLS socket, error: {}", error), + Err(error) => error!("Unable to accept TCP TLS socket, error: {error}"), } } });