Skip to content

Commit

Permalink
Fix storage
Browse files Browse the repository at this point in the history
  • Loading branch information
amaury1093 committed Dec 7, 2024
1 parent 4d84f32 commit 7e0b0a7
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 15 deletions.
2 changes: 1 addition & 1 deletion backend/backend_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,4 @@ concurrency = 5
# initiated the verification request in a multi-tenant system.
#
# Env variable: RCH__STORAGE__0__POSTGRES__TABLE_NAME
# extra = {"my_custom_key": "my_custom_value"}
# extra = { "my_custom_key" = "my_custom_value" }
15 changes: 10 additions & 5 deletions backend/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use config::Config;
use lapin::Channel;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use std::{any::Any, collections::HashMap};
use tracing::warn;

#[derive(Debug, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -150,11 +150,16 @@ impl BackendConfig {
///
/// This is quite hacky, and it will most probably be refactored away in
/// future versions. We however need to rethink how to do the `/v1/bulk`
/// endpoints first.
/// endpoints first. Simply using downcasting should be a warning sign that
/// we're doing something wrong.
///
/// ref: https://github.com/reacherhq/check-if-email-exists/issues/1544
pub fn get_pg_pool(&self) -> Option<PgPool> {
self.storages
.iter()
.find_map(|s| <dyn Any>::downcast_ref::<PostgresStorage>(s).map(|s| s.pg_pool.clone()))
self.storages.iter().find_map(|s| {
s.as_any()
.downcast_ref::<PostgresStorage>()
.map(|s| s.pg_pool.clone())
})
}
}

Expand Down
8 changes: 7 additions & 1 deletion backend/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::storage::error::StorageError;
use check_if_email_exists::{CheckEmailInputBuilderError, LOG_TARGET};
use serde::ser::SerializeStruct;
use serde::Serialize;
Expand All @@ -24,7 +25,6 @@ use warp::{http::StatusCode, reject};

/// Trait combining Display and Debug.
pub trait DisplayDebug: fmt::Display + Debug + Sync + Send {}

impl<T: fmt::Display + Debug + Sync + Send> DisplayDebug for T {}

/// Struct describing an error response.
Expand Down Expand Up @@ -113,6 +113,12 @@ impl From<anyhow::Error> for ReacherResponseError {
}
}

impl From<StorageError> for ReacherResponseError {
fn from(e: StorageError) -> Self {
ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}

/// This function receives a `Rejection` and tries to return a custom value,
/// otherwise simply passes the rejection along.
pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection> {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub fn with_db(
pool.ok_or_else(|| {
warp::reject::custom(ReacherResponseError::new(
StatusCode::SERVICE_UNAVAILABLE,
"Please configure a database on Reacher before calling this endpoint",
"Please configure a Postgres database on Reacher before calling this endpoint",
))
})
}
Expand Down
28 changes: 24 additions & 4 deletions backend/src/http/v1/check_email/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! This file implements the `POST /v1/check_email` endpoint.
use check_if_email_exists::{check_email, LOG_TARGET};
use futures::StreamExt;
use futures::{StreamExt, TryFutureExt};
use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicRejectOptions, QueueDeclareOptions,
};
Expand All @@ -33,7 +33,7 @@ use crate::http::v1::bulk::post::publish_task;
use crate::http::{check_header, ReacherResponseError};
use crate::worker::consume::MAX_QUEUE_PRIORITY;
use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask};
use crate::worker::response::SingleShotReply;
use crate::worker::single_shot::SingleShotReply;

/// The main endpoint handler that implements the logic of this route.
async fn http_handler(
Expand All @@ -51,8 +51,28 @@ async fn http_handler(

// If worker mode is disabled, we do a direct check, and skip rabbitmq.
if !config.worker.enable {
let result = check_email(&body.to_check_email_input(Arc::clone(&config))).await;
let result_bz = serde_json::to_vec(&result).map_err(ReacherResponseError::from)?;
let input = body.to_check_email_input(Arc::clone(&config));
let result = check_email(&input).await;
let value = Ok(result);

// Also store the result "manually", since we don't have a worker.
for storage in config.get_storages() {
storage
.store(
&CheckEmailTask {
input: input.clone(),
job_id: CheckEmailJobId::SingleShot,
webhook: None,
},
&value,
storage.get_extra(),
)
.map_err(ReacherResponseError::from)
.await?;
}

let result_bz = serde_json::to_vec(&value).map_err(ReacherResponseError::from)?;

return Ok(warp::reply::with_header(
result_bz,
"Content-Type",
Expand Down
7 changes: 7 additions & 0 deletions backend/src/storage/commercial_license_trial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError};
use async_trait::async_trait;
use check_if_email_exists::{redact, CheckEmailOutput, LOG_TARGET};
use serde_json::Value;
use std::any::Any;
use tracing::debug;

/// Storage that's baked in the software for users of the Commercial License
Expand Down Expand Up @@ -101,6 +102,12 @@ impl Storage for CommercialLicenseTrialStorage {
fn get_extra(&self) -> Option<serde_json::Value> {
self.postgres_storage.get_extra()
}

// This is a workaround to allow downcasting to Any, and should be removed
// ref: https://github.com/reacherhq/check-if-email-exists/issues/1544
fn as_any(&self) -> &dyn Any {
self
}
}

/// Redact all sensitive data by recursively traversing the JSON object.
Expand Down
4 changes: 4 additions & 0 deletions backend/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ pub trait Storage: Debug + Send + Sync + Any {
) -> Result<(), StorageError>;

fn get_extra(&self) -> Option<serde_json::Value>;

// This is a workaround to allow downcasting to Any, and should be removed
// ref: https://github.com/reacherhq/check-if-email-exists/issues/1544
fn as_any(&self) -> &dyn Any;
}
7 changes: 7 additions & 0 deletions backend/src/storage/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use check_if_email_exists::{CheckEmailOutput, LOG_TARGET};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use std::any::Any;
use tracing::{debug, info};

#[derive(Debug)]
Expand Down Expand Up @@ -104,4 +105,10 @@ impl Storage for PostgresStorage {
fn get_extra(&self) -> Option<serde_json::Value> {
self.extra.clone()
}

// This is a workaround to allow downcasting to Any, and should be removed
// ref: https://github.com/reacherhq/check-if-email-exists/issues/1544
fn as_any(&self) -> &dyn Any {
self
}
}
2 changes: 1 addition & 1 deletion backend/src/worker/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use super::do_work::{do_check_email_work, CheckEmailTask, TaskError};
use super::response::send_single_shot_reply;
use super::single_shot::send_single_shot_reply;
use crate::config::{BackendConfig, RabbitMQConfig, ThrottleConfig};
use crate::worker::do_work::CheckEmailJobId;
use anyhow::Context;
Expand Down
2 changes: 1 addition & 1 deletion backend/src/worker/do_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::config::BackendConfig;
use crate::worker::response::send_single_shot_reply;
use crate::worker::single_shot::send_single_shot_reply;
use check_if_email_exists::{
check_email, CheckEmailInput, CheckEmailOutput, Reachable, LOG_TARGET,
};
Expand Down
2 changes: 1 addition & 1 deletion backend/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@

pub mod consume;
pub mod do_work;
pub mod response;
pub mod single_shot;

pub use consume::{run_worker, setup_rabbit_mq};
File renamed without changes.

0 comments on commit 7e0b0a7

Please sign in to comment.