From b784c77339f05b77053882a11635acfdc867af47 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Thu, 12 Dec 2024 00:20:12 +0100 Subject: [PATCH] refactor(backend): Config storage as an enum (#1543) * Using dynamic Sotrage * Make it build * More comments * ckippy * clippy * remove worker feature * Add commercial license trial * Fix storage * Make it pass * clippy * Add commercial license * revert --- ...95660c56e852770a4eac47576089e704322a.json} | 6 +- ...b712359553bc295db5689f254f623d172326.json} | 6 +- .vscode/settings.json | 3 - Cargo.lock | 13 ++ backend/Cargo.toml | 21 +-- backend/Dockerfile | 2 +- backend/README.md | 2 +- backend/backend_config.toml | 31 +++- .../20240929230957_v1_worker_results.up.sql | 4 +- backend/src/config.rs | 162 ++++++++++-------- backend/src/db.rs | 39 ----- backend/src/http/error.rs | 15 +- backend/src/http/mod.rs | 31 ++-- backend/src/http/v1/check_email/post.rs | 34 +++- backend/src/lib.rs | 5 +- backend/src/main.rs | 27 +-- .../src/storage/commercial_license_trial.rs | 43 +++++ backend/src/storage/error.rs | 27 +++ backend/src/storage/mod.rs | 53 ++++++ backend/src/storage/postgres.rs | 102 +++++++++++ backend/src/worker/consume.rs | 4 +- backend/src/worker/do_work.rs | 32 ++-- backend/src/worker/mod.rs | 2 +- .../worker/{response.rs => single_shot.rs} | 61 +------ core/src/lib.rs | 8 +- core/src/rules.rs | 20 ++- 26 files changed, 470 insertions(+), 283 deletions(-) rename .sqlx/{query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json => query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json} (54%) rename .sqlx/{query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json => query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json} (54%) delete mode 100644 .vscode/settings.json delete mode 100644 backend/src/db.rs create mode 100644 backend/src/storage/commercial_license_trial.rs create mode 100644 backend/src/storage/error.rs create mode 100644 backend/src/storage/mod.rs create mode 100644 backend/src/storage/postgres.rs rename backend/src/worker/{response.rs => single_shot.rs} (70%) diff --git a/.sqlx/query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json b/.sqlx/query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json similarity index 54% rename from .sqlx/query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json rename to .sqlx/query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json index abcac7ded..aae6fd4e4 100644 --- a/.sqlx/query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json +++ b/.sqlx/query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n\t\t\t\tINSERT INTO v1_task_result (payload, job_id, backend_name, error)\n\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\tRETURNING id\n\t\t\t\t", + "query": "\n\t\t\t\t\tINSERT INTO v1_task_result (payload, job_id, extra, error)\n\t\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\t\tRETURNING id\n\t\t\t\t\t", "describe": { "columns": [ { @@ -13,7 +13,7 @@ "Left": [ "Jsonb", "Int4", - "Text", + "Jsonb", "Text" ] }, @@ -21,5 +21,5 @@ false ] }, - "hash": "f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1" + "hash": "068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a" } diff --git a/.sqlx/query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json b/.sqlx/query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json similarity index 54% rename from .sqlx/query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json rename to .sqlx/query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json index b8242e060..2de7122e0 100644 --- a/.sqlx/query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json +++ b/.sqlx/query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n\t\t\t\tINSERT INTO v1_task_result (payload, job_id, backend_name, result)\n\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\tRETURNING id\n\t\t\t\t", + "query": "\n\t\t\t\t\tINSERT INTO v1_task_result (payload, job_id, extra, result)\n\t\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\t\tRETURNING id\n\t\t\t\t\t", "describe": { "columns": [ { @@ -13,7 +13,7 @@ "Left": [ "Jsonb", "Int4", - "Text", + "Jsonb", "Jsonb" ] }, @@ -21,5 +21,5 @@ false ] }, - "hash": "aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2" + "hash": "de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326" } diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 2decb065a..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "rust-analyzer.cargo.features": "all" -} diff --git a/Cargo.lock b/Cargo.lock index ad6648306..dc6cffc0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2967,6 +2967,7 @@ dependencies = [ "openssl", "reqwest 0.12.5", "sentry", + "sentry-anyhow", "serde", "serde_json", "sqlx", @@ -2975,6 +2976,7 @@ dependencies = [ "tokio", "tokio-executor-trait", "tokio-reactor-trait", + "toml", "tracing", "tracing-subscriber", "uuid 1.10.0", @@ -3408,6 +3410,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "sentry-anyhow" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338ef04f73ca2fb1130ebab3853dca36041aa219a442ae873627373887660c36" +dependencies = [ + "anyhow", + "sentry-backtrace", + "sentry-core", +] + [[package]] name = "sentry-backtrace" version = "0.23.0" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7ce3794a0..b150c75fa 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -12,13 +12,14 @@ check-if-email-exists = { path = "../core", features = ["sentry"] } config = "0.14" csv = "1.3.0" dotenv = "0.15.0" -futures = { version = "0.3.30", optional = true } -lapin = { version = "2.3.1", optional = true } -tokio-executor-trait = { version = "2.1.1", optional = true } -tokio-reactor-trait = { version = "1.1.0", optional = true } +futures = { version = "0.3.30" } +lapin = { version = "2.3.1" } +tokio-executor-trait = { version = "2.1.1" } +tokio-reactor-trait = { version = "1.1.0" } openssl = { version = "0.10.64", features = ["vendored"] } -reqwest = { version = "0.12.5", features = ["json", "socks"], optional = true } +reqwest = { version = "0.12.5", features = ["json", "socks"] } sentry = "0.23" +sentry-anyhow = "0.23" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sqlx = { version = "0.7", features = [ @@ -37,11 +38,5 @@ tracing-subscriber = "0.3.18" uuid = "1.10" warp = "0.3" -[features] -worker = [ - "futures", - "lapin", - "tokio-executor-trait", - "tokio-reactor-trait", - "reqwest", -] +[dev-dependencies] +toml = "0.8" diff --git a/backend/Dockerfile b/backend/Dockerfile index 14c7fcd56..dc9393a45 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -14,7 +14,7 @@ COPY . . ENV SQLX_OFFLINE=true -RUN cargo build --bin reacher_backend --features worker --release --target=x86_64-unknown-linux-musl +RUN cargo build --bin reacher_backend --release --target=x86_64-unknown-linux-musl # ------------------------------------------------------------------------------ # Final Stage diff --git a/backend/README.md b/backend/README.md index 166d8a91e..b5c21c6ee 100644 --- a/backend/README.md +++ b/backend/README.md @@ -57,7 +57,7 @@ $ git clone https://github.com/reacherhq/check-if-email-exists $ cd check-if-email-exists/backend # Run the backend binary in release mode (slower build, but more performant). -$ cargo run --release --bin reacher_backend --features worker +$ cargo run --release --bin reacher_backend ``` The server will then be listening on `http://127.0.0.1:8080`. diff --git a/backend/backend_config.toml b/backend/backend_config.toml index b4f6fbaf9..6a8978323 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -44,6 +44,11 @@ webdriver_addr = "http://localhost:9515" # Env variable: RCH__SMTP_TIMEOUT # smtp_timeout = 45 +# Optional Sentry DSN. If set, all errors will be sent to Sentry. +# +# Env variable: RCH__SENTRY_DSN +# sentry_dsn = "" + # Uncomment the lines below to route all SMTP verification requests # through a specified proxy. Note that the proxy must be a SOCKS5 proxy to work # with the SMTP protocol. This proxy will not be used for headless @@ -119,14 +124,22 @@ concurrency = 5 # max_requests_per_hour = 1000 # max_requests_per_day = 20000 -# Postgres configuration. Currently, a Postgres database is required to store -# the results of the verifications. This might change in the future, allowing -# for pluggable storage. -[worker.postgres] -# Env variable: RCH__WORKER__POSTGRES__DB_URL -db_url = "postgresql://localhost/reacherdb" +# Below are the configurations for the storage of the email verification +# results. We currently support the following storage backends: +# - Postgres +# +# Uncomment the following line to configure the storage to use Postgres. +# [storage.postgres] -# Optional Sentry DSN. If set, all errors will be sent to Sentry. +# # URL to connect to the Postgres database. # -# Env variable: RCH__SENTRY_DSN -# sentry_dsn = "" +# Env variable: RCH__STORAGE__0__POSTGRES__DB_URL +# db_url = "postgresql://localhost/reacherdb" +# +# If you wish to store additional data along with the verification results, +# you can add a JSON object to the "extra" field. This object will be stored +# as a JSONB column in the database. This is for example useful to track who +# initiated the verification request in a multi-tenant system. +# +# Env variable: RCH__STORAGE__0__POSTGRES__TABLE_NAME +# extra = { "my_custom_key" = "my_custom_value" } diff --git a/backend/migrations/20240929230957_v1_worker_results.up.sql b/backend/migrations/20240929230957_v1_worker_results.up.sql index d055b77cf..926437d40 100644 --- a/backend/migrations/20240929230957_v1_worker_results.up.sql +++ b/backend/migrations/20240929230957_v1_worker_results.up.sql @@ -6,9 +6,9 @@ CREATE TABLE v1_bulk_job ( CREATE TABLE v1_task_result ( id SERIAL PRIMARY KEY, - job_id INTEGER NOT NULL REFERENCES v1_bulk_job(id) ON DELETE CASCADE, + job_id INTEGER REFERENCES v1_bulk_job(id) ON DELETE CASCADE, payload JSONB NOT NULL, - backend_name TEXT NOT NULL, + extra JSONB, -- any extra data that needs to be stored result JSONB, error TEXT, created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL diff --git a/backend/src/config.rs b/backend/src/config.rs index c1edc8228..65abb133d 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -14,23 +14,18 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use crate::create_db; -#[cfg(feature = "worker")] +use crate::storage::{postgres::PostgresStorage, StorageAdapter}; use crate::worker::do_work::TaskWebhook; -#[cfg(feature = "worker")] use crate::worker::setup_rabbit_mq; -use anyhow::bail; +use anyhow::{bail, Context}; use check_if_email_exists::{ CheckEmailInputProxy, GmailVerifMethod, HotmailB2BVerifMethod, HotmailB2CVerifMethod, YahooVerifMethod, LOG_TARGET, }; use config::Config; -#[cfg(feature = "worker")] use lapin::Channel; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use std::env; -#[cfg(feature = "worker")] use std::sync::Arc; use tracing::warn; @@ -64,12 +59,18 @@ pub struct BackendConfig { /// Worker configuration, only present if the backend is a worker. pub worker: WorkerConfig, + /// Configuration on where to store the email verification results. + pub storage: Option, + + /// Whether to enable the Commercial License Trial. Setting this to true + pub commercial_license_trial: Option, + // Internal fields, not part of the configuration. #[serde(skip)] - pg_pool: Option, - #[cfg(feature = "worker")] - #[serde(skip)] channel: Option>, + + #[serde(skip)] + storage_adapter: Arc, } impl BackendConfig { @@ -83,31 +84,17 @@ impl BackendConfig { self.worker.enable, &self.worker.throttle, &self.worker.rabbitmq, - &self.worker.postgres, - &self.pg_pool, - #[cfg(feature = "worker")] &self.channel, ) { - #[cfg(feature = "worker")] - ( - true, - Some(throttle), - Some(rabbitmq), - Some(postgres), - Some(pg_pool), - Some(channel), - ) => Ok(MustWorkerConfig { - pg_pool: pg_pool.clone(), - #[cfg(feature = "worker")] + (true, Some(throttle), Some(rabbitmq), Some(channel)) => Ok(MustWorkerConfig { channel: channel.clone(), throttle: throttle.clone(), rabbitmq: rabbitmq.clone(), - #[cfg(feature = "worker")] + webhook: self.worker.webhook.clone(), - postgres: postgres.clone(), }), - #[cfg(feature = "worker")] - (true, _, _, _, _, _) => bail!("Worker configuration is missing"), + + (true, _, _, _) => bail!("Worker configuration is missing"), _ => bail!("Calling must_worker_config on a non-worker backend"), } } @@ -115,43 +102,45 @@ impl BackendConfig { /// Attempt connection to the Postgres database and RabbitMQ. Also populates /// the internal `pg_pool` and `channel` fields with the connections. pub async fn connect(&mut self) -> Result<(), anyhow::Error> { - let pg_pool = if self.worker.enable { - let db_url = self - .worker - .postgres - .as_ref() - .map(|c| &c.db_url) - .ok_or_else(|| { - anyhow::anyhow!("Worker configuration is missing the postgres configuration") - })?; - Some(create_db(db_url).await?) - } else if let Ok(db_url) = env::var("DATABASE_URL") { - // For legacy reasons, we also support the DATABASE_URL environment variable: - Some(create_db(&db_url).await?) + match &self.storage { + Some(StorageConfig::Postgres(config)) => { + let storage = PostgresStorage::new(&config.db_url, config.extra.clone()) + .await + .with_context(|| format!("Connecting to postgres DB {}", config.db_url))?; + + self.storage_adapter = Arc::new(StorageAdapter::Postgres(storage)); + } + _ => { + self.storage_adapter = Arc::new(StorageAdapter::Noop); + } + } + + let channel = if self.worker.enable { + let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| { + anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration") + })?; + let channel = setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?; + Some(Arc::new(channel)) } else { None }; - self.pg_pool = pg_pool; - - #[cfg(feature = "worker")] - { - let channel = if self.worker.enable { - let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| { - anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration") - })?; - let channel = setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?; - Some(Arc::new(channel)) - } else { - None - }; - self.channel = channel; - } + self.channel = channel; Ok(()) } + /// Get all storages as a Vec. We don't really care about the keys in the + /// HashMap, except for deserialize purposes. + pub fn get_storage_adapter(&self) -> Arc { + self.storage_adapter.clone() + } + + /// Get the Postgres connection pool, if the storage is Postgres. pub fn get_pg_pool(&self) -> Option { - self.pg_pool.clone() + match self.storage_adapter.as_ref() { + StorageAdapter::Postgres(storage) => Some(storage.pg_pool.clone()), + StorageAdapter::Noop => None, + } } } @@ -175,26 +164,18 @@ pub struct WorkerConfig { pub throttle: Option, pub rabbitmq: Option, /// Optional webhook configuration to send email verification results. - #[cfg(feature = "worker")] pub webhook: Option, - /// Postgres database configuration to store email verification - /// results. - pub postgres: Option, } /// Worker configuration that must be present if worker.enable is true. Used as /// a domain type to ensure that the worker configuration is present. #[derive(Debug, Clone)] pub struct MustWorkerConfig { - pub pg_pool: PgPool, - #[cfg(feature = "worker")] pub channel: Arc, pub throttle: ThrottleConfig, pub rabbitmq: RabbitMQConfig, - #[cfg(feature = "worker")] pub webhook: Option, - pub postgres: PostgresConfig, } #[derive(Debug, Deserialize, Clone, Serialize)] @@ -204,11 +185,6 @@ pub struct RabbitMQConfig { pub concurrency: u16, } -#[derive(Debug, Deserialize, Clone, Serialize)] -pub struct PostgresConfig { - pub db_url: String, -} - #[derive(Debug, Deserialize, Clone, Serialize)] pub struct ThrottleConfig { pub max_requests_per_second: Option, @@ -229,6 +205,28 @@ impl ThrottleConfig { } } +#[derive(Debug, Default, Deserialize, Clone, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum StorageConfig { + /// Store the email verification results in the Postgres database. + Postgres(PostgresConfig), + /// Don't store the email verification results. + #[default] + Noop, +} + +#[derive(Debug, Deserialize, Clone, PartialEq, Serialize)] +pub struct PostgresConfig { + pub db_url: String, + pub extra: Option, +} + +#[derive(Debug, Deserialize, Clone, Serialize)] +pub struct CommercialLicenseTrialConfig { + pub api_token: String, + pub url: String, +} + /// Load the worker configuration from the worker_config.toml file and from the /// environment. pub async fn load_config() -> Result { @@ -247,13 +245,35 @@ pub async fn load_config() -> Result { #[cfg(test)] mod tests { - use super::load_config; + use super::*; use std::env; #[tokio::test] async fn test_env_vars() { env::set_var("RCH__BACKEND_NAME", "test-backend"); + env::set_var("RCH__STORAGE__POSTGRES__DB_URL", "test2"); let cfg = load_config().await.unwrap(); assert_eq!(cfg.backend_name, "test-backend"); + assert_eq!( + cfg.storage, + Some(StorageConfig::Postgres(PostgresConfig { + db_url: "test2".to_string(), + extra: None, + })) + ); + } + + #[tokio::test] + async fn test_serialize_storage_config() { + let storage_config = StorageConfig::Postgres(PostgresConfig { + db_url: "postgres://localhost:5432/test1".to_string(), + extra: None, + }); + + let expected = r#"[postgres] +db_url = "postgres://localhost:5432/test1" +"#; + + assert_eq!(expected, toml::to_string(&storage_config).unwrap(),); } } diff --git a/backend/src/db.rs b/backend/src/db.rs deleted file mode 100644 index c260675bc..000000000 --- a/backend/src/db.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use anyhow::Context; -use check_if_email_exists::LOG_TARGET; -use sqlx::{postgres::PgPoolOptions, PgPool}; - -use tracing::{debug, info}; - -/// Create a DB pool. -pub async fn create_db(db_url: &str) -> Result { - debug!(target: LOG_TARGET, "Connecting to DB: {}", db_url); - // create connection pool with database - // connection pool internally the shared db connection - // with arc so it can safely be cloned and shared across threads - let pool = PgPoolOptions::new() - .connect(db_url) - .await - .with_context(|| format!("Connecting to postgres DB {db_url}"))?; - - sqlx::migrate!("./migrations").run(&pool).await?; - - info!(target: LOG_TARGET, table="v1_task_result", "Connected to DB, Reacher will write verification results to DB"); - - Ok(pool) -} diff --git a/backend/src/http/error.rs b/backend/src/http/error.rs index ac4c01278..108e39268 100644 --- a/backend/src/http/error.rs +++ b/backend/src/http/error.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use crate::storage::error::StorageError; use check_if_email_exists::{CheckEmailInputBuilderError, LOG_TARGET}; use serde::ser::SerializeStruct; use serde::Serialize; @@ -24,7 +25,6 @@ use warp::{http::StatusCode, reject}; /// Trait combining Display and Debug. pub trait DisplayDebug: fmt::Display + Debug + Sync + Send {} - impl DisplayDebug for T {} /// Struct describing an error response. @@ -77,7 +77,6 @@ impl From for ReacherResponseError { } } -#[cfg(feature = "worker")] impl From for ReacherResponseError { fn from(e: lapin::Error) -> Self { ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) @@ -114,6 +113,18 @@ impl From for ReacherResponseError { } } +impl From for ReacherResponseError { + fn from(e: StorageError) -> Self { + ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) + } +} + +impl From for ReacherResponseError { + fn from(e: reqwest::Error) -> Self { + ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) + } +} + /// This function receives a `Rejection` and tries to return a custom value, /// otherwise simply passes the rejection along. pub async fn handle_rejection(err: warp::Rejection) -> Result { diff --git a/backend/src/http/mod.rs b/backend/src/http/mod.rs index fdd6a20fd..be5fe373b 100644 --- a/backend/src/http/mod.rs +++ b/backend/src/http/mod.rs @@ -16,7 +16,6 @@ mod error; mod v0; -#[cfg(feature = "worker")] mod v1; mod version; @@ -38,7 +37,8 @@ pub fn create_routes( config: Arc, ) -> impl Filter + Clone { let pg_pool = config.get_pg_pool(); - let t = version::get::get_version() + + version::get::get_version() .or(v0::check_email::post::post_check_email(Arc::clone(&config))) // The 3 following routes will 404 if o is None. .or(v0::bulk::post::create_bulk_job( @@ -46,23 +46,14 @@ pub fn create_routes( pg_pool.clone(), )) .or(v0::bulk::get::get_bulk_job_status(pg_pool.clone())) - .or(v0::bulk::results::get_bulk_job_result(pg_pool)); - - #[cfg(feature = "worker")] - { - t.or(v1::check_email::post::v1_check_email(Arc::clone(&config))) - .or(v1::bulk::post::v1_create_bulk_job(Arc::clone(&config))) - .or(v1::bulk::get_progress::v1_get_bulk_job_progress( - Arc::clone(&config), - )) - .or(v1::bulk::get_results::v1_get_bulk_job_results(config)) - .recover(handle_rejection) - } - - #[cfg(not(feature = "worker"))] - { - t.recover(handle_rejection) - } + .or(v0::bulk::results::get_bulk_job_result(pg_pool)) + .or(v1::check_email::post::v1_check_email(Arc::clone(&config))) + .or(v1::bulk::post::v1_create_bulk_job(Arc::clone(&config))) + .or(v1::bulk::get_progress::v1_get_bulk_job_progress( + Arc::clone(&config), + )) + .or(v1::bulk::get_results::v1_get_bulk_job_results(config)) + .recover(handle_rejection) } /// Runs the Warp server. @@ -121,7 +112,7 @@ pub fn with_db( pool.ok_or_else(|| { warp::reject::custom(ReacherResponseError::new( StatusCode::SERVICE_UNAVAILABLE, - "Please configure a database on Reacher before calling this endpoint", + "Please configure a Postgres database on Reacher before calling this endpoint", )) }) } diff --git a/backend/src/http/v1/check_email/post.rs b/backend/src/http/v1/check_email/post.rs index 97c383ea8..16e2a595d 100644 --- a/backend/src/http/v1/check_email/post.rs +++ b/backend/src/http/v1/check_email/post.rs @@ -17,7 +17,7 @@ //! This file implements the `POST /v1/check_email` endpoint. use check_if_email_exists::{check_email, LOG_TARGET}; -use futures::StreamExt; +use futures::{StreamExt, TryFutureExt}; use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicRejectOptions, QueueDeclareOptions, }; @@ -31,9 +31,10 @@ use crate::config::BackendConfig; use crate::http::v0::check_email::post::{with_config, CheckEmailRequest}; use crate::http::v1::bulk::post::publish_task; use crate::http::{check_header, ReacherResponseError}; +use crate::storage::commercial_license_trial::send_to_reacher; use crate::worker::consume::MAX_QUEUE_PRIORITY; use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask}; -use crate::worker::response::SingleShotReply; +use crate::worker::single_shot::SingleShotReply; /// The main endpoint handler that implements the logic of this route. async fn http_handler( @@ -51,8 +52,33 @@ async fn http_handler( // If worker mode is disabled, we do a direct check, and skip rabbitmq. if !config.worker.enable { - let result = check_email(&body.to_check_email_input(Arc::clone(&config))).await; - let result_bz = serde_json::to_vec(&result).map_err(ReacherResponseError::from)?; + let input = body.to_check_email_input(Arc::clone(&config)); + let result = check_email(&input).await; + let value = Ok(result); + + // Also store the result "manually", since we don't have a worker. + let storage = config.get_storage_adapter(); + storage + .store( + &CheckEmailTask { + input: input.clone(), + job_id: CheckEmailJobId::SingleShot, + webhook: None, + }, + &value, + storage.get_extra(), + ) + .map_err(ReacherResponseError::from) + .await?; + + // If we're in the Commercial License Trial, we also store the + // result by sending it to back to Reacher. + send_to_reacher(Arc::clone(&config), &input.to_email, &value) + .await + .map_err(ReacherResponseError::from)?; + + let result_bz = serde_json::to_vec(&value).map_err(ReacherResponseError::from)?; + return Ok(warp::reply::with_header( result_bz, "Content-Type", diff --git a/backend/src/lib.rs b/backend/src/lib.rs index b6ecc9cae..71c140ccf 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -15,11 +15,8 @@ // along with this program. If not, see . pub mod config; -mod db; pub mod http; -#[cfg(feature = "worker")] +mod storage; pub mod worker; -pub use db::create_db; - const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/backend/src/main.rs b/backend/src/main.rs index 2a262b82b..e815cd62b 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -20,7 +20,6 @@ use check_if_email_exists::{setup_sentry, LOG_TARGET}; use reacher_backend::config::load_config; use reacher_backend::http::run_warp_server; -#[cfg(feature = "worker")] use reacher_backend::worker::run_worker; use std::sync::Arc; use tracing::{debug, info}; @@ -45,25 +44,17 @@ async fn main() -> Result<(), anyhow::Error> { let config = Arc::new(config); - #[cfg(feature = "worker")] - { - let server_future = run_warp_server(Arc::clone(&config)); - let worker_future = async { - if config.worker.enable { - run_worker(config).await?; - } - Ok(()) - }; + let server_future = run_warp_server(Arc::clone(&config)); + let worker_future = async { + if config.worker.enable { + run_worker(config).await?; + } + Ok(()) + }; - tokio::try_join!(server_future, worker_future)?; + tokio::try_join!(server_future, worker_future)?; - info!("Shutting down..."); - } - - #[cfg(not(feature = "worker"))] - { - run_warp_server(config).await?; - } + info!("Shutting down..."); Ok(()) } diff --git a/backend/src/storage/commercial_license_trial.rs b/backend/src/storage/commercial_license_trial.rs new file mode 100644 index 000000000..4c84f5e65 --- /dev/null +++ b/backend/src/storage/commercial_license_trial.rs @@ -0,0 +1,43 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::config::{BackendConfig, CommercialLicenseTrialConfig}; +use crate::worker::do_work::TaskError; +use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; +use std::sync::Arc; +use tracing::debug; + +/// If we're in the Commercial License Trial, we also store the +/// result by sending it to back to Reacher. +pub async fn send_to_reacher( + config: Arc, + email: &str, + worker_output: &Result, +) -> Result<(), reqwest::Error> { + if let Some(CommercialLicenseTrialConfig { api_token, url }) = &config.commercial_license_trial + { + let res = reqwest::Client::new() + .post(url) + .header("Authorization", api_token) + .json(worker_output) + .send() + .await?; + let res = res.text().await?; + debug!(target: LOG_TARGET, email=email, res=res, "Sent result to Reacher Commercial License Trial"); + } + + Ok(()) +} diff --git a/backend/src/storage/error.rs b/backend/src/storage/error.rs new file mode 100644 index 000000000..d88f4d4b4 --- /dev/null +++ b/backend/src/storage/error.rs @@ -0,0 +1,27 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum StorageError { + #[error("SQLX error: {0}")] + SqlxError(#[from] sqlx::error::Error), + #[error("SQLX migrate error: {0}")] + MigrateError(#[from] sqlx::migrate::MigrateError), + #[error("Serde JSON error: {0}")] + SerdeJsonError(#[from] serde_json::Error), +} diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs new file mode 100644 index 000000000..992500f1a --- /dev/null +++ b/backend/src/storage/mod.rs @@ -0,0 +1,53 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub mod commercial_license_trial; +pub mod error; +pub mod postgres; + +use crate::worker::do_work::{CheckEmailTask, TaskError}; +use check_if_email_exists::CheckEmailOutput; +use error::StorageError; +use postgres::PostgresStorage; +use std::fmt::Debug; + +#[derive(Debug, Default)] +pub enum StorageAdapter { + Postgres(PostgresStorage), + #[default] + Noop, +} + +impl StorageAdapter { + pub async fn store( + &self, + task: &CheckEmailTask, + worker_output: &Result, + extra: Option, + ) -> Result<(), StorageError> { + match self { + StorageAdapter::Postgres(storage) => storage.store(task, worker_output, extra).await, + StorageAdapter::Noop => Ok(()), + } + } + + pub fn get_extra(&self) -> Option { + match self { + StorageAdapter::Postgres(storage) => storage.get_extra().clone(), + StorageAdapter::Noop => None, + } + } +} diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs new file mode 100644 index 000000000..401b4e94a --- /dev/null +++ b/backend/src/storage/postgres.rs @@ -0,0 +1,102 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use super::error::StorageError; +use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; +use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; +use sqlx::postgres::PgPoolOptions; +use sqlx::PgPool; +use tracing::{debug, info}; + +#[derive(Debug)] +pub struct PostgresStorage { + pub pg_pool: PgPool, + extra: Option, +} + +impl PostgresStorage { + pub async fn new(db_url: &str, extra: Option) -> Result { + debug!(target: LOG_TARGET, "Connecting to DB: {}", db_url); + // create connection pool with database + // connection pool internally the shared db connection + // with arc so it can safely be cloned and shared across threads + let pg_pool = PgPoolOptions::new().connect(db_url).await?; + + sqlx::migrate!("./migrations").run(&pg_pool).await?; + + info!(target: LOG_TARGET, table="v1_task_result", "Connected to DB, Reacher will write verification results to DB"); + + Ok(Self { pg_pool, extra }) + } + + pub async fn store( + &self, + task: &CheckEmailTask, + worker_output: &Result, + extra: Option, + ) -> Result<(), StorageError> { + let payload_json = serde_json::to_value(task)?; + + match worker_output { + Ok(output) => { + let output_json = serde_json::to_value(output)?; + + sqlx::query!( + r#" + INSERT INTO v1_task_result (payload, job_id, extra, result) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + payload_json, + match task.job_id { + CheckEmailJobId::Bulk(job_id) => Some(job_id), + CheckEmailJobId::SingleShot => None, + }, + extra, + output_json, + ) + .fetch_one(&self.pg_pool) + .await?; + } + Err(err) => { + sqlx::query!( + r#" + INSERT INTO v1_task_result (payload, job_id, extra, error) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + payload_json, + match task.job_id { + CheckEmailJobId::Bulk(job_id) => Some(job_id), + CheckEmailJobId::SingleShot => None, + }, + extra, + err.to_string(), + ) + .fetch_one(&self.pg_pool) + .await?; + } + } + + debug!(target: LOG_TARGET, email=?task.input.to_email, "Wrote to DB"); + + Ok(()) + } + + pub fn get_extra(&self) -> Option { + self.extra.clone() + } +} diff --git a/backend/src/worker/consume.rs b/backend/src/worker/consume.rs index 5dc0d5b78..9104f33c3 100644 --- a/backend/src/worker/consume.rs +++ b/backend/src/worker/consume.rs @@ -15,13 +15,14 @@ // along with this program. If not, see . use super::do_work::{do_check_email_work, CheckEmailTask, TaskError}; -use super::response::send_single_shot_reply; +use super::single_shot::send_single_shot_reply; use crate::config::{BackendConfig, RabbitMQConfig, ThrottleConfig}; use crate::worker::do_work::CheckEmailJobId; use anyhow::Context; use check_if_email_exists::LOG_TARGET; use futures::stream::StreamExt; use lapin::{options::*, types::FieldTable, Channel, Connection, ConnectionProperties}; +use sentry_anyhow::capture_anyhow; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Mutex; @@ -164,6 +165,7 @@ async fn consume_check_email(config: Arc) -> Result<(), anyhow::E do_check_email_work(&payload, delivery, channel_clone2, config_clone2).await { error!(target: LOG_TARGET, email=payload.input.to_email, error=?e, "Error processing message"); + capture_anyhow(&e); } }); diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 5cc99b078..214ca110a 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use super::response::save_to_db; use crate::config::BackendConfig; -use crate::worker::response::send_single_shot_reply; +use crate::storage::commercial_license_trial::send_to_reacher; +use crate::worker::single_shot::send_single_shot_reply; use check_if_email_exists::{ check_email, CheckEmailInput, CheckEmailOutput, Reachable, LOG_TARGET, }; @@ -138,25 +138,23 @@ pub(crate) async fn do_check_email_work( _ => { // This is the happy path. We acknowledge the message and: // - If it's a single-shot email verification, we send a reply to the client. - // - If it's a bulk verification, we save the result to the database. + // - In any case, we store the result. delivery.ack(BasicAckOptions::default()).await?; - match task.job_id { - CheckEmailJobId::SingleShot => { - send_single_shot_reply(channel, &delivery, &worker_output).await?; - } - CheckEmailJobId::Bulk(bulk_job_id) => { - save_to_db( - &config.backend_name, - config.get_pg_pool(), - task, - bulk_job_id, - &worker_output, - ) - .await?; - } + if let CheckEmailJobId::SingleShot = task.job_id { + send_single_shot_reply(channel, &delivery, &worker_output).await?; } + // Store the result. + let storage = config.get_storage_adapter(); + storage + .store(task, &worker_output, storage.get_extra()) + .await?; + + // If we're in the Commercial License Trial, we also store the + // result by sending it to back to Reacher. + send_to_reacher(config, &task.input.to_email, &worker_output).await?; + info!(target: LOG_TARGET, email=task.input.to_email, worker_output=?worker_output.map(|o| o.is_reachable), diff --git a/backend/src/worker/mod.rs b/backend/src/worker/mod.rs index 90b474c33..bffe565a6 100644 --- a/backend/src/worker/mod.rs +++ b/backend/src/worker/mod.rs @@ -22,6 +22,6 @@ pub mod consume; pub mod do_work; -pub mod response; +pub mod single_shot; pub use consume::{run_worker, setup_rabbit_mq}; diff --git a/backend/src/worker/response.rs b/backend/src/worker/single_shot.rs similarity index 70% rename from backend/src/worker/response.rs rename to backend/src/worker/single_shot.rs index 0d1d73d4a..1b10c5cf2 100644 --- a/backend/src/worker/response.rs +++ b/backend/src/worker/single_shot.rs @@ -14,77 +14,18 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use super::do_work::{CheckEmailTask, TaskError}; +use super::do_work::TaskError; use anyhow::bail; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use lapin::message::Delivery; use lapin::options::BasicPublishOptions; use lapin::{BasicProperties, Channel}; use serde::{Deserialize, Serialize}; -use sqlx::PgPool; use std::convert::TryFrom; use std::sync::Arc; use tracing::debug; use warp::http::StatusCode; -/// Save the task result to the database. This only happens if the task is a -/// part of a bulk verification job. If no pool is provided, the function will -/// simply return without doing anything. -/// -/// # Panics -/// -/// Panics if the task is a single-shot task, i.e. if `payload.job_id` is `None`. -pub async fn save_to_db( - backend_name: &str, - pg_pool: Option, - task: &CheckEmailTask, - bulk_job_id: i32, - worker_output: &Result, -) -> Result<(), anyhow::Error> { - let pg_pool = pg_pool.ok_or_else(|| anyhow::anyhow!("No DB pool provided"))?; - - let payload_json = serde_json::to_value(task)?; - - match worker_output { - Ok(output) => { - let output_json = serde_json::to_value(output)?; - - sqlx::query!( - r#" - INSERT INTO v1_task_result (payload, job_id, backend_name, result) - VALUES ($1, $2, $3, $4) - RETURNING id - "#, - payload_json, - bulk_job_id, - backend_name, - output_json, - ) - .fetch_one(&pg_pool) - .await?; - } - Err(err) => { - sqlx::query!( - r#" - INSERT INTO v1_task_result (payload, job_id, backend_name, error) - VALUES ($1, $2, $3, $4) - RETURNING id - "#, - payload_json, - bulk_job_id, - backend_name, - err.to_string(), - ) - .fetch_one(&pg_pool) - .await?; - } - } - - debug!(target: LOG_TARGET, email=?task.input.to_email, "Wrote to DB"); - - Ok(()) -} - /// For single-shot email verifications, the worker will send a reply to the /// client with the result of the verification. Since both CheckEmailOutput and /// TaskError are not Deserialize, we need to create a new struct that can be diff --git a/core/src/lib.rs b/core/src/lib.rs index 23619ff59..d2d6da0b5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -253,7 +253,7 @@ pub async fn check_email(input: &CheckEmailInput) -> CheckEmailOutput { let end_time = SystemTime::now(); - CheckEmailOutput { + let output = CheckEmailOutput { input: to_email.to_string(), is_reachable: calculate_reachable(&my_misc, &my_smtp), misc: Ok(my_misc), @@ -269,5 +269,9 @@ pub async fn check_email(input: &CheckEmailInput) -> CheckEmailOutput { smtp: smtp_debug, backend_name: input.backend_name.clone(), }, - } + }; + + log_unknown_errors(&output, &input.backend_name); + + output } diff --git a/core/src/rules.rs b/core/src/rules.rs index 30c209126..88d76b4ef 100644 --- a/core/src/rules.rs +++ b/core/src/rules.rs @@ -92,14 +92,16 @@ mod tests { #[test] fn should_skip_catch_all() { - assert_eq!( - true, - has_rule("gmail.com", "alt4.aspmx.l.google.com.", &Rule::SkipCatchAll) - ); - - assert_eq!( - true, - has_rule("domain.com", ".antispamcloud.com.", &Rule::SkipCatchAll) - ) + assert!(has_rule( + "gmail.com", + "alt4.aspmx.l.google.com.", + &Rule::SkipCatchAll + )); + + assert!(has_rule( + "domain.com", + ".antispamcloud.com.", + &Rule::SkipCatchAll + )) } }