From 7e0b0a7710f3a5d1f7db9ce3c92c2275ed2e7dd3 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 23:53:37 +0100 Subject: [PATCH] Fix storage --- backend/backend_config.toml | 2 +- backend/src/config.rs | 15 ++++++---- backend/src/http/error.rs | 8 +++++- backend/src/http/mod.rs | 2 +- backend/src/http/v1/check_email/post.rs | 28 ++++++++++++++++--- .../src/storage/commercial_license_trial.rs | 7 +++++ backend/src/storage/mod.rs | 4 +++ backend/src/storage/postgres.rs | 7 +++++ backend/src/worker/consume.rs | 2 +- backend/src/worker/do_work.rs | 2 +- backend/src/worker/mod.rs | 2 +- .../worker/{response.rs => single_shot.rs} | 0 12 files changed, 64 insertions(+), 15 deletions(-) rename backend/src/worker/{response.rs => single_shot.rs} (100%) diff --git a/backend/backend_config.toml b/backend/backend_config.toml index d0f976ee6..c5b28bb26 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -151,4 +151,4 @@ concurrency = 5 # initiated the verification request in a multi-tenant system. # # Env variable: RCH__STORAGE__0__POSTGRES__TABLE_NAME -# extra = {"my_custom_key": "my_custom_value"} +# extra = { "my_custom_key" = "my_custom_value" } diff --git a/backend/src/config.rs b/backend/src/config.rs index bd8066ca4..1ad69b4db 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -27,8 +27,8 @@ use config::Config; use lapin::Channel; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use std::collections::HashMap; use std::sync::Arc; -use std::{any::Any, collections::HashMap}; use tracing::warn; #[derive(Debug, Default, Serialize, Deserialize)] @@ -150,11 +150,16 @@ impl BackendConfig { /// /// This is quite hacky, and it will most probably be refactored away in /// future versions. We however need to rethink how to do the `/v1/bulk` - /// endpoints first. + /// endpoints first. Simply using downcasting should be a warning sign that + /// we're doing something wrong. + /// + /// ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 pub fn get_pg_pool(&self) -> Option { - self.storages - .iter() - .find_map(|s| ::downcast_ref::(s).map(|s| s.pg_pool.clone())) + self.storages.iter().find_map(|s| { + s.as_any() + .downcast_ref::() + .map(|s| s.pg_pool.clone()) + }) } } diff --git a/backend/src/http/error.rs b/backend/src/http/error.rs index 0428994f5..af4c095f2 100644 --- a/backend/src/http/error.rs +++ b/backend/src/http/error.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use crate::storage::error::StorageError; use check_if_email_exists::{CheckEmailInputBuilderError, LOG_TARGET}; use serde::ser::SerializeStruct; use serde::Serialize; @@ -24,7 +25,6 @@ use warp::{http::StatusCode, reject}; /// Trait combining Display and Debug. pub trait DisplayDebug: fmt::Display + Debug + Sync + Send {} - impl DisplayDebug for T {} /// Struct describing an error response. @@ -113,6 +113,12 @@ impl From for ReacherResponseError { } } +impl From for ReacherResponseError { + fn from(e: StorageError) -> Self { + ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) + } +} + /// This function receives a `Rejection` and tries to return a custom value, /// otherwise simply passes the rejection along. pub async fn handle_rejection(err: warp::Rejection) -> Result { diff --git a/backend/src/http/mod.rs b/backend/src/http/mod.rs index e94f62f2a..be5fe373b 100644 --- a/backend/src/http/mod.rs +++ b/backend/src/http/mod.rs @@ -112,7 +112,7 @@ pub fn with_db( pool.ok_or_else(|| { warp::reject::custom(ReacherResponseError::new( StatusCode::SERVICE_UNAVAILABLE, - "Please configure a database on Reacher before calling this endpoint", + "Please configure a Postgres database on Reacher before calling this endpoint", )) }) } diff --git a/backend/src/http/v1/check_email/post.rs b/backend/src/http/v1/check_email/post.rs index 97c383ea8..ddd595d0b 100644 --- a/backend/src/http/v1/check_email/post.rs +++ b/backend/src/http/v1/check_email/post.rs @@ -17,7 +17,7 @@ //! This file implements the `POST /v1/check_email` endpoint. use check_if_email_exists::{check_email, LOG_TARGET}; -use futures::StreamExt; +use futures::{StreamExt, TryFutureExt}; use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicRejectOptions, QueueDeclareOptions, }; @@ -33,7 +33,7 @@ use crate::http::v1::bulk::post::publish_task; use crate::http::{check_header, ReacherResponseError}; use crate::worker::consume::MAX_QUEUE_PRIORITY; use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask}; -use crate::worker::response::SingleShotReply; +use crate::worker::single_shot::SingleShotReply; /// The main endpoint handler that implements the logic of this route. async fn http_handler( @@ -51,8 +51,28 @@ async fn http_handler( // If worker mode is disabled, we do a direct check, and skip rabbitmq. if !config.worker.enable { - let result = check_email(&body.to_check_email_input(Arc::clone(&config))).await; - let result_bz = serde_json::to_vec(&result).map_err(ReacherResponseError::from)?; + let input = body.to_check_email_input(Arc::clone(&config)); + let result = check_email(&input).await; + let value = Ok(result); + + // Also store the result "manually", since we don't have a worker. + for storage in config.get_storages() { + storage + .store( + &CheckEmailTask { + input: input.clone(), + job_id: CheckEmailJobId::SingleShot, + webhook: None, + }, + &value, + storage.get_extra(), + ) + .map_err(ReacherResponseError::from) + .await?; + } + + let result_bz = serde_json::to_vec(&value).map_err(ReacherResponseError::from)?; + return Ok(warp::reply::with_header( result_bz, "Content-Type", diff --git a/backend/src/storage/commercial_license_trial.rs b/backend/src/storage/commercial_license_trial.rs index 71c3f611d..54a38d79e 100644 --- a/backend/src/storage/commercial_license_trial.rs +++ b/backend/src/storage/commercial_license_trial.rs @@ -21,6 +21,7 @@ use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; use async_trait::async_trait; use check_if_email_exists::{redact, CheckEmailOutput, LOG_TARGET}; use serde_json::Value; +use std::any::Any; use tracing::debug; /// Storage that's baked in the software for users of the Commercial License @@ -101,6 +102,12 @@ impl Storage for CommercialLicenseTrialStorage { fn get_extra(&self) -> Option { self.postgres_storage.get_extra() } + + // This is a workaround to allow downcasting to Any, and should be removed + // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 + fn as_any(&self) -> &dyn Any { + self + } } /// Redact all sensitive data by recursively traversing the JSON object. diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs index fcd1afcea..1daa514c5 100644 --- a/backend/src/storage/mod.rs +++ b/backend/src/storage/mod.rs @@ -35,4 +35,8 @@ pub trait Storage: Debug + Send + Sync + Any { ) -> Result<(), StorageError>; fn get_extra(&self) -> Option; + + // This is a workaround to allow downcasting to Any, and should be removed + // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 + fn as_any(&self) -> &dyn Any; } diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs index 618c23776..ff0523224 100644 --- a/backend/src/storage/postgres.rs +++ b/backend/src/storage/postgres.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; +use std::any::Any; use tracing::{debug, info}; #[derive(Debug)] @@ -104,4 +105,10 @@ impl Storage for PostgresStorage { fn get_extra(&self) -> Option { self.extra.clone() } + + // This is a workaround to allow downcasting to Any, and should be removed + // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/backend/src/worker/consume.rs b/backend/src/worker/consume.rs index 5dc0d5b78..b46003287 100644 --- a/backend/src/worker/consume.rs +++ b/backend/src/worker/consume.rs @@ -15,7 +15,7 @@ // along with this program. If not, see . use super::do_work::{do_check_email_work, CheckEmailTask, TaskError}; -use super::response::send_single_shot_reply; +use super::single_shot::send_single_shot_reply; use crate::config::{BackendConfig, RabbitMQConfig, ThrottleConfig}; use crate::worker::do_work::CheckEmailJobId; use anyhow::Context; diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 4c530b620..6d7e7dec1 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -15,7 +15,7 @@ // along with this program. If not, see . use crate::config::BackendConfig; -use crate::worker::response::send_single_shot_reply; +use crate::worker::single_shot::send_single_shot_reply; use check_if_email_exists::{ check_email, CheckEmailInput, CheckEmailOutput, Reachable, LOG_TARGET, }; diff --git a/backend/src/worker/mod.rs b/backend/src/worker/mod.rs index 90b474c33..bffe565a6 100644 --- a/backend/src/worker/mod.rs +++ b/backend/src/worker/mod.rs @@ -22,6 +22,6 @@ pub mod consume; pub mod do_work; -pub mod response; +pub mod single_shot; pub use consume::{run_worker, setup_rabbit_mq}; diff --git a/backend/src/worker/response.rs b/backend/src/worker/single_shot.rs similarity index 100% rename from backend/src/worker/response.rs rename to backend/src/worker/single_shot.rs