diff --git a/editoast/Cargo.lock b/editoast/Cargo.lock index 66274f6c67..1a03249e39 100644 --- a/editoast/Cargo.lock +++ b/editoast/Cargo.lock @@ -1119,11 +1119,25 @@ dependencies = [ "tokio", ] +[[package]] +name = "deadpool-lapin" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c7b14064f854a3969735e7c948c677a57ef17ca7f0bc029da8fe2e5e0fc1eb" +dependencies = [ + "deadpool", + "lapin", + "tokio-executor-trait", +] + [[package]] name = "deadpool-runtime" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] [[package]] name = "deprecate-until" @@ -1337,6 +1351,7 @@ dependencies = [ "clap", "colored", "dashmap", + "deadpool-lapin", "derivative", "diesel", "diesel-async", @@ -4511,6 +4526,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-executor-trait" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "802ccf58e108fe16561f35348fabe15ff38218968f033d587e399a84937533cc" +dependencies = [ + "async-trait", + "executor-trait", + "tokio", +] + [[package]] name = "tokio-io-timeout" version = "1.2.0" diff --git a/editoast/Cargo.toml b/editoast/Cargo.toml index 40f9acb5be..3607fff789 100644 --- a/editoast/Cargo.toml +++ b/editoast/Cargo.toml @@ -96,6 +96,7 @@ chrono.workspace = true clap = { version = "4.5.19", features = ["derive", "env"] } colored = "2.1.0" dashmap = "6.1.0" +deadpool-lapin = "0.12.1" derivative.workspace = true diesel.workspace = true diesel-async = { workspace = true } diff --git a/editoast/openapi.yaml b/editoast/openapi.yaml index 01c7c92365..7ef5ae7b90 100644 --- a/editoast/openapi.yaml +++ b/editoast/openapi.yaml @@ -4068,6 +4068,8 @@ components: - $ref: '#/components/schemas/EditoastEditoastUrlErrorInvalidUrl' - $ref: '#/components/schemas/EditoastElectricalProfilesErrorNotFound' - $ref: '#/components/schemas/EditoastErrorConnectionDoesNotExist' + - $ref: '#/components/schemas/EditoastErrorCreatePoolLapin' + - $ref: '#/components/schemas/EditoastErrorDeadpoolLapin' - $ref: '#/components/schemas/EditoastErrorLapin' - $ref: '#/components/schemas/EditoastErrorResponseTimeout' - $ref: '#/components/schemas/EditoastErrorSerialization' @@ -4148,6 +4150,44 @@ components: type: string enum: - editoast:coreclient:ConnectionDoesNotExist + EditoastErrorCreatePoolLapin: + type: object + required: + - type + - status + - message + properties: + context: + type: object + message: + type: string + status: + type: integer + enum: + - 500 + type: + type: string + enum: + - editoast:coreclient:CreatePoolLapin + EditoastErrorDeadpoolLapin: + type: object + required: + - type + - status + - message + properties: + context: + type: object + message: + type: string + status: + type: integer + enum: + - 500 + type: + type: string + enum: + - editoast:coreclient:DeadpoolLapin EditoastErrorLapin: type: object required: diff --git a/editoast/src/core/mq_client.rs b/editoast/src/core/mq_client.rs index 9d6e791119..5d1a0ace62 100644 --- a/editoast/src/core/mq_client.rs +++ b/editoast/src/core/mq_client.rs @@ -1,23 +1,21 @@ +use deadpool_lapin::{Config, CreatePoolError, Pool, PoolError, Runtime}; use editoast_derive::EditoastError; use futures_util::StreamExt; use itertools::Itertools; use lapin::{ options::{BasicConsumeOptions, BasicPublishOptions}, types::{ByteArray, FieldTable, ShortString}, - BasicProperties, Connection, ConnectionProperties, + BasicProperties, }; use serde::Serialize; use serde_json::to_vec; -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; use thiserror::Error; -use tokio::{ - sync::RwLock, - time::{timeout, Duration}, -}; +use tokio::time::{timeout, Duration}; #[derive(Debug, Clone)] pub struct RabbitMQClient { - connection: Arc>>, + pub pool: Pool, exchange: String, timeout: u64, hostname: String, @@ -51,6 +49,12 @@ pub enum Error { #[error("Connection does not exist")] #[editoast_error(status = "500")] ConnectionDoesNotExist, + #[error("Cannot create the pool")] + #[editoast_error(status = "500")] + CreatePoolLapin(CreatePoolError), + #[error("Cannot acquire connection from pool")] + #[editoast_error(status = "500")] + DeadpoolLapin(PoolError), } pub struct MQResponse { @@ -64,61 +68,22 @@ impl RabbitMQClient { .map(|name| name.to_string_lossy().into_owned()) .unwrap_or_else(|_| "unknown".to_string()); - let conn = Arc::new(RwLock::new(None)); - - tokio::spawn(Self::connection_loop(options.uri, conn.clone())); + let cfg = Config { + url: Some(options.uri), + ..Default::default() + }; + let pool = cfg + .create_pool(Some(Runtime::Tokio1)) + .map_err(Error::CreatePoolLapin)?; Ok(RabbitMQClient { - connection: conn, + pool, exchange: format!("{}-req-xchg", options.worker_pool_identifier), timeout: options.timeout, hostname, }) } - async fn connection_ok(connection: &Arc>>) -> bool { - let guard = connection.as_ref().read().await; - let conn = guard.as_ref(); - let status = match conn { - None => return false, - Some(conn) => conn.status().state(), - }; - match status { - lapin::ConnectionState::Initial => true, - lapin::ConnectionState::Connecting => true, - lapin::ConnectionState::Connected => true, - lapin::ConnectionState::Closing => true, - lapin::ConnectionState::Closed => false, - lapin::ConnectionState::Error => false, - } - } - - async fn connection_loop(uri: String, connection: Arc>>) { - loop { - if Self::connection_ok(&connection).await { - tokio::time::sleep(Duration::from_secs(2)).await; - continue; - } - - tracing::info!("Reconnecting to RabbitMQ"); - - // Connection should be re-established - let new_connection = Connection::connect(&uri, ConnectionProperties::default()).await; - - match new_connection { - Ok(new_connection) => { - *connection.write().await = Some(new_connection); - tracing::info!("Reconnected to RabbitMQ"); - } - Err(e) => { - tracing::error!("Error while reconnecting to RabbitMQ: {:?}", e); - } - } - - tokio::time::sleep(Duration::from_secs(2)).await; - } - } - #[allow(dead_code)] pub async fn call( &self, @@ -131,14 +96,8 @@ impl RabbitMQClient { where T: Serialize, { - // Get current connection - let connection = self.connection.read().await; - if connection.is_none() { - return Err(Error::ConnectionDoesNotExist); - } - let connection = connection.as_ref().unwrap(); - // Create a channel + let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?; let channel = connection.create_channel().await.map_err(Error::Lapin)?; let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?; @@ -172,6 +131,12 @@ impl RabbitMQClient { .await .map_err(Error::Lapin)?; + // Explicitly close the channel + channel + .close(200, "Normal shutdown") + .await + .map_err(Error::Lapin)?; + Ok(()) } @@ -186,14 +151,8 @@ impl RabbitMQClient { where T: Serialize, { - // Get current connection - let connection = self.connection.read().await; - if connection.is_none() { - return Err(Error::ConnectionDoesNotExist); - } - let connection = connection.as_ref().unwrap(); - // Create a channel + let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?; let channel = connection.create_channel().await.map_err(Error::Lapin)?; let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?; @@ -244,10 +203,20 @@ impl RabbitMQClient { Duration::from_secs(override_timeout.unwrap_or(self.timeout)), consumer.next(), ) - .await - .map_err(|_| Error::ResponseTimeout)?; + .await; - match response_delivery { + if response_delivery.is_err() { + channel + .close(200, "Normal shutdown") + .await + .map_err(Error::Lapin)?; + + return Err(Error::ResponseTimeout); + } + + let response_delivery = response_delivery.unwrap(); + + let result = match response_delivery { Some(Ok(delivery)) => { let status = delivery .properties @@ -265,7 +234,15 @@ impl RabbitMQClient { } Some(Err(e)) => Err(e.into()), None => panic!("Rabbitmq consumer was cancelled unexpectedly"), - } + }; + + // Explicitly close the channel + channel + .close(200, "Normal shutdown") + .await + .map_err(Error::Lapin)?; + + result } } diff --git a/front/public/locales/en/errors.json b/front/public/locales/en/errors.json index f7f2bc4cac..e6586c7801 100644 --- a/front/public/locales/en/errors.json +++ b/front/public/locales/en/errors.json @@ -47,7 +47,9 @@ "Serialization": "Core: cannot serialize request", "StatusParsing": "Core: cannot parse status", "UnparsableErrorOutput": "Core returned an error in an unknown format", - "ConnectionDoesNotExist": "Core: message queue: connection not established" + "ConnectionDoesNotExist": "Core: message queue: connection not established", + "CreatePoolLapin": "Core: message queue: cannot create pool", + "DeadpoolLapin": "Core: message queue: pool error" }, "DatabaseAccessError": "Database access fatal error", "document": { diff --git a/front/public/locales/fr/errors.json b/front/public/locales/fr/errors.json index d42e8269cc..4295573b56 100644 --- a/front/public/locales/fr/errors.json +++ b/front/public/locales/fr/errors.json @@ -47,7 +47,9 @@ "Serialization": "Core: impossible de sérialiser la requête", "StatusParsing": "Core: impossible d'obtenir le status", "UnparsableErrorOutput": "Core: a renvoyé une erreur dans un format inconnu", - "ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie" + "ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie", + "CreatePoolLapin": "Core: file d'attente de messages: erreur de création de pool", + "DeadpoolLapin": "Core: file d'attente de messages: erreur de pool" }, "document": { "NotFound": "Document '{{document_key}}' non trouvé"