From c0ddce273d39252474672fa56a24fa9a10a178bb Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1729@users.noreply.github.com> Date: Mon, 11 Dec 2023 12:34:38 +0100 Subject: [PATCH] refactor: Merge worker into backend (#1404) * Put worker into backend * Rename to http * Add try join * Change LOG_TARGET * Revemo stray logs * fmt * fmt * Fix clippy * clippy --- .github/workflows/deploy_backend.yml | 1 - .github/workflows/deploy_worker.yml | 25 --- .vscode/settings.json | 3 + Cargo.lock | 25 +-- Cargo.toml | 2 +- Dockerfile.backend => Dockerfile | 2 +- Dockerfile.worker | 55 ----- backend/Cargo.toml | 19 +- backend/src/bin/prune_db.rs | 4 +- backend/src/{routes => http}/bulk/db.rs | 0 backend/src/{routes => http}/bulk/error.rs | 0 backend/src/{routes => http}/bulk/get.rs | 10 +- backend/src/http/bulk/mod.rs | 65 ++++++ backend/src/{routes => http}/bulk/post.rs | 13 +- .../bulk/results/csv_helper.rs | 0 .../src/{routes => http}/bulk/results/mod.rs | 26 ++- backend/src/{routes => http}/bulk/task.rs | 26 +-- .../src/{routes => http}/check_email/mod.rs | 0 .../src/{routes => http}/check_email/post.rs | 0 backend/src/http/mod.rs | 96 +++++++++ backend/src/{routes => http}/version/get.rs | 0 backend/src/{routes => http}/version/mod.rs | 0 backend/src/lib.rs | 4 +- backend/src/main.rs | 124 ++---------- backend/src/routes/bulk/mod.rs | 24 --- backend/src/routes/mod.rs | 35 ---- backend/src/sentry_util.rs | 12 +- .../src/worker/check_email.rs | 41 +--- .../src/main.rs => backend/src/worker/mod.rs | 21 +- backend/tests/check_email.rs | 4 +- backend/docker.sh => docker.sh | 0 worker/Cargo.toml | 24 --- worker/README.md | 12 -- worker/docker.sh | 5 - worker/src/sentry_util.rs | 189 ------------------ 35 files changed, 270 insertions(+), 597 deletions(-) delete mode 100644 .github/workflows/deploy_worker.yml create mode 100644 .vscode/settings.json rename Dockerfile.backend => Dockerfile (96%) delete mode 100644 Dockerfile.worker rename backend/src/{routes => http}/bulk/db.rs (100%) rename backend/src/{routes => http}/bulk/error.rs (100%) rename backend/src/{routes => http}/bulk/get.rs (98%) create mode 100644 backend/src/http/bulk/mod.rs rename backend/src/{routes => http}/bulk/post.rs (96%) rename backend/src/{routes => http}/bulk/results/csv_helper.rs (100%) rename backend/src/{routes => http}/bulk/results/mod.rs (96%) rename backend/src/{routes => http}/bulk/task.rs (97%) rename backend/src/{routes => http}/check_email/mod.rs (100%) rename backend/src/{routes => http}/check_email/post.rs (100%) create mode 100644 backend/src/http/mod.rs rename backend/src/{routes => http}/version/get.rs (100%) rename backend/src/{routes => http}/version/mod.rs (100%) delete mode 100644 backend/src/routes/bulk/mod.rs delete mode 100644 backend/src/routes/mod.rs rename worker/src/worker.rs => backend/src/worker/check_email.rs (62%) rename worker/src/main.rs => backend/src/worker/mod.rs (86%) rename backend/docker.sh => docker.sh (100%) delete mode 100644 worker/Cargo.toml delete mode 100644 worker/README.md delete mode 100755 worker/docker.sh delete mode 100644 worker/src/sentry_util.rs diff --git a/.github/workflows/deploy_backend.yml b/.github/workflows/deploy_backend.yml index 3b82248c2..88d060571 100644 --- a/.github/workflows/deploy_backend.yml +++ b/.github/workflows/deploy_backend.yml @@ -22,4 +22,3 @@ jobs: username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} tags: "${{ steps.vars.outputs.GITHUB_TAG }}" - dockerfile: Dockerfile.backend diff --git a/.github/workflows/deploy_worker.yml b/.github/workflows/deploy_worker.yml deleted file mode 100644 index 457b50789..000000000 --- a/.github/workflows/deploy_worker.yml +++ /dev/null @@ -1,25 +0,0 @@ -name: tag - -on: - push: - tags: - - "worker/v*.*.*" - -jobs: - docker-publish: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@master - - name: Set GITHUB_TAG arg - id: vars - run: echo ::set-output name=GITHUB_TAG::${GITHUB_REF:17} # Remove /refs/head/worker/ - - name: Print version - run: echo "Publishing reacherhq/worker:${{ steps.vars.outputs.GITHUB_TAG }}" - - name: Publish to Registry - uses: elgohr/Publish-Docker-Github-Action@v5 - with: - name: reacherhq/worker - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - tags: "${{ steps.vars.outputs.GITHUB_TAG }}" - dockerfile: Dockerfile.worker diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..a7857b019 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.cargo.features": ["worker"] +} diff --git a/Cargo.lock b/Cargo.lock index 4c3f86315..23833f1a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2412,37 +2412,24 @@ dependencies = [ "check-if-email-exists", "csv", "dotenv", - "env_logger", + "futures", + "futures-lite 2.1.0", + "lapin", "log", "openssl", + "reqwest", "sentry", "serde", "serde_json", "sqlx", "sqlxmq", "tokio", - "uuid 1.6.1", - "warp", -] - -[[package]] -name = "reacher_worker" -version = "1.0.0" -dependencies = [ - "async-global-executor", - "async-smtp", - "check-if-email-exists", - "futures-lite 2.1.0", - "lapin", - "reqwest", - "sentry", - "serde", - "serde_json", - "tokio", "tokio-executor-trait", "tokio-reactor-trait", "tracing", "tracing-subscriber", + "uuid 1.6.1", + "warp", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9f2b30d7d..861920f11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["backend", "cli", "core", "worker"] +members = ["backend", "cli", "core"] diff --git a/Dockerfile.backend b/Dockerfile similarity index 96% rename from Dockerfile.backend rename to Dockerfile index 2abbe4ced..17dfc6084 100644 --- a/Dockerfile.backend +++ b/Dockerfile @@ -31,7 +31,7 @@ USER root RUN apk add --no-cache chromium-chromedriver COPY --from=cargo-build /usr/src/reacher/target/x86_64-unknown-linux-musl/release/reacher_backend . -COPY --from=cargo-build /usr/src/reacher/backend/docker.sh . +COPY --from=cargo-build /usr/src/reacher/docker.sh . RUN chown chrome:chrome reacher_backend RUN chown chrome:chrome docker.sh diff --git a/Dockerfile.worker b/Dockerfile.worker deleted file mode 100644 index a84ccb0a6..000000000 --- a/Dockerfile.worker +++ /dev/null @@ -1,55 +0,0 @@ -# From https://shaneutt.com/blog/rust-fast-small-docker-image-builds/ - -# ------------------------------------------------------------------------------ -# Cargo Build Stage -# ------------------------------------------------------------------------------ - -FROM messense/rust-musl-cross:x86_64-musl as cargo-build - -WORKDIR /usr/src/reacher - -RUN rm -f target/x86_64-unknown-linux-musl/release/deps/reacher* - -COPY . . - -ENV SQLX_OFFLINE=true - -RUN cargo build --bin reacher_worker --release --target=x86_64-unknown-linux-musl - -# ------------------------------------------------------------------------------ -# Final Stage -# ------------------------------------------------------------------------------ - -FROM zenika/alpine-chrome - -WORKDIR /home/reacher/ - -USER root - -# Install chromedriver -# https://github.com/Zenika/alpine-chrome/blob/master/with-chromedriver/Dockerfile -RUN apk add --no-cache chromium-chromedriver - -COPY --from=cargo-build /usr/src/reacher/target/x86_64-unknown-linux-musl/release/reacher_worker . -COPY --from=cargo-build /usr/src/reacher/worker/docker.sh . - -RUN chown chrome:chrome reacher_worker -RUN chown chrome:chrome docker.sh - -# User chrome was created in zenika/alpine-chrome -USER chrome - -ENV RUST_LOG=reacher=info -ENV RCH_HTTP_HOST=0.0.0.0 -ENV PORT=8080 -ENV RCH_WEBDRIVER_ADDR=http://localhost:9515 -# Bulk verification is disabled by default. Set to 1 to enable it. -ENV RCH_ENABLE_BULK=0 - -EXPOSE 8080 - -# Remove entrypoint from parent Docker file -# https://stackoverflow.com/questions/40122152/how-to-remove-entrypoint-from-parent-image-on-dockerfile -ENTRYPOINT [] - -CMD ["./docker.sh"] diff --git a/backend/Cargo.toml b/backend/Cargo.toml index f9df0f9f6..71f4067bd 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -10,9 +10,12 @@ async-smtp = "0.6" check-if-email-exists = { path = "../core", features = ["headless"] } csv = "1.3.0" dotenv = "0.15.0" -env_logger = "0.10" +futures = { version = "0.3.29", optional = true } +futures-lite = { version = "2.1.0", optional = true } +lapin = { version = "2.3.1", optional = true } log = "0.4" openssl = { version = "0.10.57", features = ["vendored"] } +reqwest = { version = "0.11.22", features = ["json"], optional = true } sentry = "0.23" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -26,5 +29,19 @@ sqlx = { version = "0.7", features = [ ] } sqlxmq = "0.5" tokio = { version = "1.29", features = ["macros"] } +tokio-executor-trait = { version = "2.1.1", optional = true } +tokio-reactor-trait = { version = "1.1.0", optional = true } +tracing = "0.1.40" +tracing-subscriber = "0.3.18" uuid = "1.6" warp = "0.3" + +[features] +worker = [ + "futures", + "futures-lite", + "lapin", + "reqwest", + "tokio-executor-trait", + "tokio-reactor-trait", +] diff --git a/backend/src/bin/prune_db.rs b/backend/src/bin/prune_db.rs index 572dd17ad..5c59ba8ee 100644 --- a/backend/src/bin/prune_db.rs +++ b/backend/src/bin/prune_db.rs @@ -1,11 +1,11 @@ -use log::info; use sqlx::PgPool; use sqlx::Result; +use tracing::info; #[tokio::main] async fn main() -> Result<()> { dotenv::dotenv().expect("Unable to load environment variables from .env file"); - env_logger::init(); // Initialize the logger + tracing_subscriber::fmt::init(); let db_url = std::env::var("DATABASE_URL").expect("Unable to read DATABASE_URL env var"); let dry_mode: bool = std::env::var("DRY_RUN").is_ok(); diff --git a/backend/src/routes/bulk/db.rs b/backend/src/http/bulk/db.rs similarity index 100% rename from backend/src/routes/bulk/db.rs rename to backend/src/http/bulk/db.rs diff --git a/backend/src/routes/bulk/error.rs b/backend/src/http/bulk/error.rs similarity index 100% rename from backend/src/routes/bulk/error.rs rename to backend/src/http/bulk/error.rs diff --git a/backend/src/routes/bulk/get.rs b/backend/src/http/bulk/get.rs similarity index 98% rename from backend/src/routes/bulk/get.rs rename to backend/src/http/bulk/get.rs index a7b549cce..004f591c1 100644 --- a/backend/src/routes/bulk/get.rs +++ b/backend/src/http/bulk/get.rs @@ -16,13 +16,14 @@ //! This file implements the `GET /bulk/{id}` endpoint. +use check_if_email_exists::LOG_TARGET; use serde::Serialize; use sqlx::types::chrono::{DateTime, Utc}; use sqlx::{Pool, Postgres}; +use tracing::error; use warp::Filter; use super::{db::with_db, error::BulkError}; -use check_if_email_exists::LOG_TARGET; /// NOTE: Type conversions from postgres to rust types /// are according to the table given by @@ -83,11 +84,10 @@ async fn job_status( .fetch_one(&conn_pool) .await .map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to get job record for [job={}] with [error={}]", - job_id, - e + job_id, e ); BulkError::from(e) })?; @@ -109,7 +109,7 @@ async fn job_status( .fetch_one(&conn_pool) .await .map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to get aggregate info for [job={}] with [error={}]", job_id, diff --git a/backend/src/http/bulk/mod.rs b/backend/src/http/bulk/mod.rs new file mode 100644 index 000000000..061d27e3a --- /dev/null +++ b/backend/src/http/bulk/mod.rs @@ -0,0 +1,65 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod db; +mod error; +pub mod get; +pub mod post; +pub mod results; +mod task; + +use std::env; + +use check_if_email_exists::LOG_TARGET; +use sqlx::{Pool, Postgres}; +use sqlxmq::{JobRegistry, JobRunnerHandle}; +use tracing::info; + +pub use task::email_verification_task; + +/// Create a job registry with one task: the email verification task. +pub async fn create_job_registry(pool: &Pool) -> Result { + let min_task_conc = env::var("RCH_MINIMUM_TASK_CONCURRENCY").map_or(10, |var| { + var.parse::() + .expect("Environment variable RCH_MINIMUM_TASK_CONCURRENCY should parse to usize") + }); + let max_conc_task_fetch = env::var("RCH_MAXIMUM_CONCURRENT_TASK_FETCH").map_or(20, |var| { + var.parse::() + .expect("Environment variable RCH_MAXIMUM_CONCURRENT_TASK_FETCH should parse to usize") + }); + + // registry needs to be given list of jobs it can accept + let registry = JobRegistry::new(&[email_verification_task]); + + // create runner for the message queue associated + // with this job registry + let registry = registry + // Create a job runner using the connection pool. + .runner(pool) + // Here is where you can configure the job runner + // Aim to keep 10-20 jobs running at a time. + .set_concurrency(min_task_conc, max_conc_task_fetch) + // Start the job runner in the background. + .run() + .await?; + + info!( + target: LOG_TARGET, + "Bulk endpoints enabled with concurrency min={min_task_conc} to max={max_conc_task_fetch}." + ); + + Ok(registry) +} diff --git a/backend/src/routes/bulk/post.rs b/backend/src/http/bulk/post.rs similarity index 96% rename from backend/src/routes/bulk/post.rs rename to backend/src/http/bulk/post.rs index dffcd8400..29db497fc 100644 --- a/backend/src/routes/bulk/post.rs +++ b/backend/src/http/bulk/post.rs @@ -20,6 +20,7 @@ use check_if_email_exists::CheckEmailInputProxy; use check_if_email_exists::LOG_TARGET; use serde::{Deserialize, Serialize}; use sqlx::{Pool, Postgres}; +use tracing::{debug, error}; use warp::Filter; use super::{ @@ -106,11 +107,10 @@ async fn create_bulk_request( .fetch_one(&conn_pool) .await .map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to create job record for [body={:?}] with [error={}]", - &body, - e + &body, e ); BulkError::from(e) })?; @@ -118,11 +118,10 @@ async fn create_bulk_request( for task_input in body.into_iter() { let task_uuid = submit_job(&conn_pool, rec.id, task_input).await?; - log::debug!( + debug!( target: LOG_TARGET, "Submitted task to sqlxmq for [job={}] with [uuid={}]", - rec.id, - task_uuid + rec.id, task_uuid ); } @@ -147,6 +146,6 @@ pub fn create_bulk_job( .and(warp::body::content_length_limit(1024 * 16)) .and(warp::body::json()) .and_then(create_bulk_request) - // View access logs by setting `RUST_LOG=reacher`. + // View access logs by setting `RUST_LOG=reacher_backend`. .with(warp::log(LOG_TARGET)) } diff --git a/backend/src/routes/bulk/results/csv_helper.rs b/backend/src/http/bulk/results/csv_helper.rs similarity index 100% rename from backend/src/routes/bulk/results/csv_helper.rs rename to backend/src/http/bulk/results/csv_helper.rs diff --git a/backend/src/routes/bulk/results/mod.rs b/backend/src/http/bulk/results/mod.rs similarity index 96% rename from backend/src/routes/bulk/results/mod.rs rename to backend/src/http/bulk/results/mod.rs index dbcd635de..b19dbc9fa 100644 --- a/backend/src/routes/bulk/results/mod.rs +++ b/backend/src/http/bulk/results/mod.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; use sqlx::{Executor, Pool, Postgres, Row}; use std::convert::TryInto; use std::iter::Iterator; +use tracing::error; use warp::Filter; use super::{ @@ -68,11 +69,10 @@ async fn job_result( .fetch_one(&conn_pool) .await .map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to fetch total_records for [job={}] with [error={}]", - job_id, - e + job_id, e ); BulkError::from(e) })? @@ -84,11 +84,10 @@ async fn job_result( .fetch_one(&conn_pool) .await .map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to get total_processed for [job={}] with [error={}]", - job_id, - e + job_id, e ); BulkError::from(e) })? @@ -107,11 +106,10 @@ async fn job_result( let reply = serde_json::to_vec(&JobResultJsonResponse { results: data }).map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to convert json results to string for [job={}] with [error={}]", - job_id, - e + job_id, e ); BulkError::Json(e) @@ -151,7 +149,7 @@ async fn job_result_as_iter( ); let rows = conn_pool.fetch_all(query).await.map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to get results for [job={}] [limit={}] [offset={}] with [error={}]", job_id, @@ -196,7 +194,7 @@ async fn job_result_csv( for json_value in rows { let result_csv: JobResultCsvResponse = CsvWrapper(json_value).try_into().map_err(|e: &'static str| { - log::error!( + error!( target: LOG_TARGET, "Failed to convert json to csv output struct for [job={}] [limit={}] [offset={}] to csv with [error={}]", job_id, @@ -208,7 +206,7 @@ async fn job_result_csv( BulkError::Csv(CsvError::Parse(e)) })?; wtr.serialize(result_csv).map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to serialize result for [job={}] [limit={}] [offset={}] to csv with [error={}]", job_id, @@ -222,7 +220,7 @@ async fn job_result_csv( } let data = wtr.into_inner().map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to convert results for [job={}] [limit={}] [offset={}] to csv with [error={}]", job_id, @@ -245,6 +243,6 @@ pub fn get_bulk_job_result( .and(with_db(o)) .and(warp::query::()) .and_then(job_result) - // View access logs by setting `RUST_LOG=reacher`. + // View access logs by setting `RUST_LOG=reacher_backend`. .with(warp::log(LOG_TARGET)) } diff --git a/backend/src/routes/bulk/task.rs b/backend/src/http/bulk/task.rs similarity index 97% rename from backend/src/routes/bulk/task.rs rename to backend/src/http/bulk/task.rs index c33134e15..e4bc07ac3 100644 --- a/backend/src/routes/bulk/task.rs +++ b/backend/src/http/bulk/task.rs @@ -16,16 +16,18 @@ //! This file implements the `POST /bulk` endpoint. -use super::error::BulkError; -use crate::check::check_email; use check_if_email_exists::LOG_TARGET; use check_if_email_exists::{CheckEmailInput, CheckEmailInputProxy, CheckEmailOutput, Reachable}; use serde::{Deserialize, Serialize}; use sqlx::{Pool, Postgres}; use sqlxmq::{job, CurrentJob}; use std::error::Error; +use tracing::{debug, error}; use uuid::Uuid; +use super::error::BulkError; +use crate::check::check_email; + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TaskInput { // fields for CheckEmailInput @@ -104,11 +106,10 @@ pub async fn submit_job( .builder() .set_json(&task_payload) .map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to submit task with the following [input={:?}] with [error={}]", - task_payload.input, - e + task_payload.input, e ); BulkError::Json(e) @@ -116,11 +117,10 @@ pub async fn submit_job( .spawn(conn_pool) .await .map_err(|e| { - log::error!( + error!( target: LOG_TARGET, "Failed to submit task for [bulk_req={}] with [error={}]", - job_id, - e + job_id, e ); e @@ -150,7 +150,7 @@ pub async fn email_verification_task( let mut final_response: Option = None; for check_email_input in task_payload.input { - log::debug!( + debug!( target: LOG_TARGET, "Starting task [email={}] for [job={}] and [uuid={}]", check_email_input.to_email, @@ -161,7 +161,7 @@ pub async fn email_verification_task( let to_email = check_email_input.to_email.clone(); let response = check_email(check_email_input).await; - log::debug!( + debug!( target: LOG_TARGET, "Got task result [email={}] for [job={}] and [uuid={}] with [is_reachable={:?}]", to_email, @@ -204,8 +204,8 @@ pub async fn email_verification_task( .fetch_optional(current_job.pool()) .await .map_err(|e| { - log::error!( - target:LOG_TARGET, + error!( + target: LOG_TARGET, "Failed to write [email={}] result to db for [job={}] and [uuid={}] with [error={}]", response.input, job_id, @@ -216,7 +216,7 @@ pub async fn email_verification_task( e })?; - log::debug!( + debug!( target: LOG_TARGET, "Wrote result for [email={}] for [job={}] and [uuid={}]", response.input, diff --git a/backend/src/routes/check_email/mod.rs b/backend/src/http/check_email/mod.rs similarity index 100% rename from backend/src/routes/check_email/mod.rs rename to backend/src/http/check_email/mod.rs diff --git a/backend/src/routes/check_email/post.rs b/backend/src/http/check_email/post.rs similarity index 100% rename from backend/src/routes/check_email/post.rs rename to backend/src/http/check_email/post.rs diff --git a/backend/src/http/mod.rs b/backend/src/http/mod.rs new file mode 100644 index 000000000..69e44bf06 --- /dev/null +++ b/backend/src/http/mod.rs @@ -0,0 +1,96 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod bulk; +pub mod check_email; +mod version; + +use std::env; +use std::net::IpAddr; + +use check_if_email_exists::LOG_TARGET; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; +use tracing::info; +use warp::Filter; + +use super::errors; + +pub fn create_routes( + o: Option>, +) -> impl Filter + Clone { + version::get::get_version() + .or(check_email::post::post_check_email()) + // The 3 following routes will 404 if o is None. + .or(bulk::post::create_bulk_job(o.clone())) + .or(bulk::get::get_bulk_job_status(o.clone())) + .or(bulk::results::get_bulk_job_result(o)) + .recover(errors::handle_rejection) +} + +/// Runs the Warp server. +/// +/// This function starts the Warp server and listens for incoming requests. +/// It returns a `Result` indicating whether the server started successfully or encountered an error. +pub async fn run_warp_server() -> Result<(), Box> { + let host = env::var("RCH_HTTP_HOST") + .unwrap_or_else(|_| "127.0.0.1".into()) + .parse::() + .expect("Environment variable RCH_HTTP_HOST is malformed."); + let port = env::var("PORT") + .map(|port: String| { + port.parse::() + .expect("Environment variable PORT is malformed.") + }) + .unwrap_or(8080); + + let is_bulk_enabled = env::var("RCH_ENABLE_BULK").unwrap_or_else(|_| "0".into()) == "1"; + let db = if is_bulk_enabled { + let pool = create_db().await?; + let _registry = bulk::create_job_registry(&pool).await?; + Some(pool) + } else { + None + }; + + let routes = create_routes(db); + + info!(target: LOG_TARGET, host=?host,port=?port, "Server is listening"); + warp::serve(routes).run((host, port)).await; + + Ok(()) +} + +/// Create a DB pool. +async fn create_db() -> Result, sqlx::Error> { + let pg_conn = + env::var("DATABASE_URL").expect("Environment variable DATABASE_URL should be set"); + let pg_max_conn = env::var("RCH_DATABASE_MAX_CONNECTIONS").map_or(5, |var| { + var.parse::() + .expect("Environment variable RCH_DATABASE_MAX_CONNECTIONS should parse to u32") + }); + + // create connection pool with database + // connection pool internally the shared db connection + // with arc so it can safely be cloned and shared across threads + let pool = PgPoolOptions::new() + .max_connections(pg_max_conn) + .connect(pg_conn.as_str()) + .await?; + + sqlx::migrate!("./migrations").run(&pool).await?; + + Ok(pool) +} diff --git a/backend/src/routes/version/get.rs b/backend/src/http/version/get.rs similarity index 100% rename from backend/src/routes/version/get.rs rename to backend/src/http/version/get.rs diff --git a/backend/src/routes/version/mod.rs b/backend/src/http/version/mod.rs similarity index 100% rename from backend/src/routes/version/mod.rs rename to backend/src/http/version/mod.rs diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 8b28dc8c7..e8ef4a937 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -16,5 +16,7 @@ pub mod check; mod errors; -pub mod routes; +pub mod http; pub mod sentry_util; +#[cfg(feature = "worker")] +pub mod worker; diff --git a/backend/src/main.rs b/backend/src/main.rs index c8196a22d..9872c75e4 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -18,119 +18,33 @@ //! functions, depending on whether the `bulk` feature is enabled or not. use check_if_email_exists::LOG_TARGET; -use dotenv::dotenv; -use reacher_backend::routes::{bulk::email_verification_task, create_routes}; -use reacher_backend::sentry_util::{setup_sentry, CARGO_PKG_VERSION}; -use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; -use sqlxmq::{JobRegistry, JobRunnerHandle}; -use std::{env, net::IpAddr}; -use warp::Filter; +#[cfg(feature = "worker")] +use futures::try_join; +use tracing::info; + +#[cfg(feature = "worker")] +use reacher_backend::worker::run_worker; +use reacher_backend::{ + http::run_warp_server, + sentry_util::{setup_sentry, CARGO_PKG_VERSION}, +}; /// Run a HTTP server using warp with bulk endpoints. +/// If the worker feature is enabled, this function will also start a worker +/// that listens to an AMQP message queue. #[tokio::main] async fn main() -> Result<(), Box> { - init_logger(); + // Initialize logging. + tracing_subscriber::fmt::init(); + info!(target: LOG_TARGET, version=?CARGO_PKG_VERSION, "Running Reacher"); // Setup sentry bug tracking. - let _guard = setup_sentry(); - - let is_bulk_enabled = env::var("RCH_ENABLE_BULK").unwrap_or_else(|_| "0".into()) == "1"; - if is_bulk_enabled { - let pool = create_db().await?; - let _registry = create_job_registry(&pool).await?; - let routes = create_routes(Some(pool)); - run_warp_server(routes).await?; - } else { - let routes = create_routes(None); - run_warp_server(routes).await?; - } - - Ok(()) -} - -fn init_logger() { - // Read from .env file if present. - let _ = dotenv(); - env_logger::init(); - log::info!(target: LOG_TARGET, "Running Reacher v{}", CARGO_PKG_VERSION); -} - -/// Create a DB pool. -pub async fn create_db() -> Result, sqlx::Error> { - let pg_conn = - env::var("DATABASE_URL").expect("Environment variable DATABASE_URL should be set"); - let pg_max_conn = env::var("RCH_DATABASE_MAX_CONNECTIONS").map_or(5, |var| { - var.parse::() - .expect("Environment variable RCH_DATABASE_MAX_CONNECTIONS should parse to u32") - }); - - // create connection pool with database - // connection pool internally the shared db connection - // with arc so it can safely be cloned and shared across threads - let pool = PgPoolOptions::new() - .max_connections(pg_max_conn) - .connect(pg_conn.as_str()) - .await?; - - sqlx::migrate!("./migrations").run(&pool).await?; - - Ok(pool) -} - -/// Create a job registry with one task: the email verification task. -async fn create_job_registry(pool: &Pool) -> Result { - let min_task_conc = env::var("RCH_MINIMUM_TASK_CONCURRENCY").map_or(10, |var| { - var.parse::() - .expect("Environment variable RCH_MINIMUM_TASK_CONCURRENCY should parse to usize") - }); - let max_conc_task_fetch = env::var("RCH_MAXIMUM_CONCURRENT_TASK_FETCH").map_or(20, |var| { - var.parse::() - .expect("Environment variable RCH_MAXIMUM_CONCURRENT_TASK_FETCH should parse to usize") - }); - - // registry needs to be given list of jobs it can accept - let registry = JobRegistry::new(&[email_verification_task]); - - // create runner for the message queue associated - // with this job registry - let registry = registry - // Create a job runner using the connection pool. - .runner(pool) - // Here is where you can configure the job runner - // Aim to keep 10-20 jobs running at a time. - .set_concurrency(min_task_conc, max_conc_task_fetch) - // Start the job runner in the background. - .run() - .await?; - - log::info!( - target: LOG_TARGET, - "Bulk endpoints enabled with concurrency min={min_task_conc} to max={max_conc_task_fetch}." - ); - - Ok(registry) -} + let _guard: sentry::ClientInitGuard = setup_sentry(); -async fn run_warp_server( - routes: impl Filter - + Clone - + Send - + Sync - + 'static, -) -> Result<(), Box> { - let host = env::var("RCH_HTTP_HOST") - .unwrap_or_else(|_| "127.0.0.1".into()) - .parse::() - .expect("Environment variable RCH_HTTP_HOST is malformed."); - let port = env::var("PORT") - .map(|port| { - port.parse::() - .expect("Environment variable PORT is malformed.") - }) - .unwrap_or(8080); - println!("Server is listening on {host}:{port}."); + let _http_server = run_warp_server(); - warp::serve(routes).run((host, port)).await; + #[cfg(feature = "worker")] + try_join!(_http_server, run_worker())?; Ok(()) } diff --git a/backend/src/routes/bulk/mod.rs b/backend/src/routes/bulk/mod.rs deleted file mode 100644 index 1332bfd83..000000000 --- a/backend/src/routes/bulk/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -mod db; -mod error; -pub mod get; -pub mod post; -pub mod results; -mod task; - -pub use task::email_verification_task; diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs deleted file mode 100644 index ed3df73cd..000000000 --- a/backend/src/routes/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -pub mod bulk; -pub mod check_email; -mod version; - -use super::errors; -use sqlx::{Pool, Postgres}; -use warp::Filter; - -pub fn create_routes( - o: Option>, -) -> impl Filter + Clone { - version::get::get_version() - .or(check_email::post::post_check_email()) - // The 3 following routes will 404 if o is None. - .or(bulk::post::create_bulk_job(o.clone())) - .or(bulk::get::get_bulk_job_status(o.clone())) - .or(bulk::results::get_bulk_job_result(o)) - .recover(errors::handle_rejection) -} diff --git a/backend/src/sentry_util.rs b/backend/src/sentry_util.rs index d1d93fbb8..481df2d50 100644 --- a/backend/src/sentry_util.rs +++ b/backend/src/sentry_util.rs @@ -29,6 +29,7 @@ use check_if_email_exists::mx::MxError; use check_if_email_exists::LOG_TARGET; use check_if_email_exists::{smtp::SmtpError, CheckEmailOutput}; use sentry::protocol::{Event, Exception, Level, Values}; +use tracing::{debug, info}; use super::sentry_util; @@ -40,7 +41,7 @@ pub fn setup_sentry() -> sentry::ClientInitGuard { // will just silently ignore. let sentry = sentry::init(env::var("RCH_SENTRY_DSN").unwrap_or_else(|_| "".into())); if sentry.is_enabled() { - log::info!(target: LOG_TARGET, "Sentry is successfully set up.") + info!(target: LOG_TARGET, "Sentry is successfully set up.") } sentry @@ -83,11 +84,7 @@ impl<'a> SentryError<'a> { /// info before sending to Sentry, by removing all instances of `username`. fn error(err: SentryError, result: &CheckEmailOutput) { let exception_value = redact(format!("{err:?}").as_str(), &result.syntax.username); - log::debug!( - target: LOG_TARGET, - "Sending error to Sentry: {}", - exception_value - ); + debug!(target: LOG_TARGET, "Sending error to Sentry: {}", exception_value); let exception = Exception { ty: err.get_exception_type(), @@ -148,8 +145,7 @@ pub fn log_unknown_errors(result: &CheckEmailOutput) { { // If the SMTP error is transient and known, we don't track it in // Sentry, just log it locally. - log::debug!( - target: LOG_TARGET, + debug!(target: LOG_TARGET, "Transient error: {}", redact( response.message[0].as_str(), diff --git a/worker/src/worker.rs b/backend/src/worker/check_email.rs similarity index 62% rename from worker/src/worker.rs rename to backend/src/worker/check_email.rs index fde1ae3ae..f823a14f2 100644 --- a/worker/src/worker.rs +++ b/backend/src/worker/check_email.rs @@ -14,17 +14,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::env; - -use check_if_email_exists::CheckEmailInput; -use check_if_email_exists::{check_email as ciee_check_email, CheckEmailOutput}; +use check_if_email_exists::LOG_TARGET; +use check_if_email_exists::{CheckEmailInput, CheckEmailOutput}; use lapin::message::Delivery; use lapin::options::*; use serde::Deserialize; use serde::Serialize; use tracing::{debug, info}; -use crate::sentry_util::log_unknown_errors; +use crate::check::check_email; #[derive(Debug, Deserialize)] pub struct CheckEmailPayload { @@ -49,11 +47,11 @@ pub async fn process_check_email( delivery: Delivery, ) -> Result<(), Box> { let payload = serde_json::from_slice::(&delivery.data)?; - info!(email=?payload.input.to_email, "Start check"); - debug!(payload=?payload); + info!(target: LOG_TARGET, email=?payload.input.to_email, "New job"); + debug!(target: LOG_TARGET, payload=?payload); let output = check_email(payload.input).await; - debug!(email=output.input,output=?output, "Done check-if-email-exists"); + debug!(target: LOG_TARGET, email=output.input,output=?output, "Done check-if-email-exists"); // Check if we have a webhook to send the output to. if let Some(webhook) = payload.webhook { @@ -71,34 +69,11 @@ pub async fn process_check_email( .await? .text() .await?; - debug!(email=?webhook_output.output.input,res=?res, "Received webhook response"); - info!(email=?webhook_output.output.input, "Finished check"); + debug!(target: LOG_TARGET, email=?webhook_output.output.input,res=?res, "Received webhook response"); + info!(target: LOG_TARGET, email=?webhook_output.output.input, is_reachable=?webhook_output.output.is_reachable, "Finished check"); } delivery.ack(BasicAckOptions::default()).await?; Ok(()) } - -/// Same as `check-if-email-exists`'s check email, but adds some additional -/// inputs and error handling. -async fn check_email(input: CheckEmailInput) -> CheckEmailOutput { - let from_email = - env::var("RCH_FROM_EMAIL").unwrap_or_else(|_| CheckEmailInput::default().from_email); - let hello_name: String = - env::var("RCH_HELLO_NAME").unwrap_or_else(|_| CheckEmailInput::default().hello_name); - - let input = CheckEmailInput { - // If we want to override core check-if-email-exists's default values - // for CheckEmailInput for the backend, we do it here. - from_email, - hello_name, - ..input - }; - - let res = ciee_check_email(&input).await; - - log_unknown_errors(&res); - - res -} diff --git a/worker/src/main.rs b/backend/src/worker/mod.rs similarity index 86% rename from worker/src/main.rs rename to backend/src/worker/mod.rs index 133b12393..e1dc5dafc 100644 --- a/worker/src/main.rs +++ b/backend/src/worker/mod.rs @@ -16,23 +16,16 @@ use std::env; +use check_if_email_exists::LOG_TARGET; use futures_lite::StreamExt; use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties}; use tracing::{error, info}; -mod sentry_util; -mod worker; +mod check_email; -use sentry_util::setup_sentry; -use worker::process_check_email; - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Setup sentry bug tracking. - let _guard = setup_sentry(); - - tracing_subscriber::fmt::init(); +use check_email::process_check_email; +pub async fn run_worker() -> Result<(), Box> { // Make sure the worker is well configured. let addr = env::var("RCH_AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672".into()); let backend_name = env::var("RCH_BACKEND_NAME").expect("RCH_BACKEND_NAME is not set"); @@ -52,7 +45,7 @@ async fn main() -> Result<(), Box> { // Receive channel let channel = conn.create_channel().await?; - info!(backend=?backend_name,state=?conn.status().state(), "Connected to AMQP broker"); + info!(target: LOG_TARGET, backend=?backend_name,state=?conn.status().state(), "Connected to AMQP broker"); // Create queue "check_email.{Smtp,Headless}" with priority. let queue_name = format!("check_email.{:?}", verif_method); @@ -70,7 +63,7 @@ async fn main() -> Result<(), Box> { ) .await?; - info!(queue=?queue_name, "Worker will start consuming messages"); + info!(target: LOG_TARGET, queue=?queue_name, "Worker will start consuming messages"); let mut consumer = channel .basic_consume( &queue_name, @@ -85,7 +78,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { let res = process_check_email(delivery).await; if let Err(err) = res { - error!(error=?err, "Error processing message"); + error!(target: LOG_TARGET, error=?err, "Error processing message"); } }); } diff --git a/backend/tests/check_email.rs b/backend/tests/check_email.rs index add48e090..dbe374994 100644 --- a/backend/tests/check_email.rs +++ b/backend/tests/check_email.rs @@ -18,8 +18,7 @@ use std::env; use check_if_email_exists::CheckEmailInput; use reacher_backend::check::REACHER_SECRET_HEADER; -use reacher_backend::routes::create_routes; - +use reacher_backend::http::create_routes; use warp::http::StatusCode; use warp::test::request; @@ -39,7 +38,6 @@ async fn test_input_foo_bar() { .await; assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp.body()); - println!("{:?}", resp.body()); assert!(resp.body().starts_with(FOO_BAR_RESPONSE.as_bytes())); } diff --git a/backend/docker.sh b/docker.sh similarity index 100% rename from backend/docker.sh rename to docker.sh diff --git a/worker/Cargo.toml b/worker/Cargo.toml deleted file mode 100644 index e59c902ea..000000000 --- a/worker/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "reacher_worker" -version = "1.0.0" -edition = "2018" -description = "RabbitMQ worker for Reacher" -authors = ["Amaury "] -license = "AGPL-3.0" -publish = false - -[dependencies] -async-smtp = "0.6" -async-global-executor = "2.4.0" -check-if-email-exists = { path = "../core", features = ["headless"] } -futures-lite = "2.1.0" -lapin = "2.3.1" -reqwest = { version = "0.11.22", features = ["json"] } -sentry = "0.23" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -tokio = { version = "1.29", features = ["macros"] } -tokio-executor-trait = "2.1.1" -tokio-reactor-trait = "1.1.0" -tracing = "0.1.40" -tracing-subscriber = "0.3.18" diff --git a/worker/README.md b/worker/README.md deleted file mode 100644 index 3442a3504..000000000 --- a/worker/README.md +++ /dev/null @@ -1,12 +0,0 @@ -[![Docker](https://img.shields.io/docker/v/reacherhq/worker?color=0db7ed&label=docker&sort=date)](https://hub.docker.com/r/reacherhq/worker) -[![Actions Status](https://github.com/reacherhq/check-if-email-exists/workflows/pr/badge.svg)](https://github.com/reacherhq/check-if-email-exists/actions) - -

- -

reacher

-

👷 Reacher Worker

-

RabbitMQ worker for Reacher Email Verification API: https://reacher.email.

- -

- -This crate holds a RabbitMQ worker for [Reacher](https://reacher.email). More documentation coming soon... diff --git a/worker/docker.sh b/worker/docker.sh deleted file mode 100755 index 9c14fd55c..000000000 --- a/worker/docker.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/ash - -# https://docs.docker.com/config/containers/multi-service_container/ -chromedriver & -./reacher_worker diff --git a/worker/src/sentry_util.rs b/worker/src/sentry_util.rs deleted file mode 100644 index 38a1e6ca9..000000000 --- a/worker/src/sentry_util.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -//! Helper functions to send events to Sentry. -//! -//! This module also contains functions that check if the error's given by -//! `check-if-email-exists` are known errors, in which case we don't log them -//! to Sentry. - -use std::borrow::Cow; -use std::env; - -use async_smtp::smtp::error::Error as AsyncSmtpError; -use check_if_email_exists::misc::MiscError; -use check_if_email_exists::mx::MxError; -use check_if_email_exists::{smtp::SmtpError, CheckEmailOutput}; -use sentry::protocol::{Event, Exception, Level, Values}; -use tracing::{debug, info}; - -use super::sentry_util; - -pub const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); - -/// Setup Sentry. -pub fn setup_sentry() -> sentry::ClientInitGuard { - // Use an empty string if we don't have any env variable for sentry. Sentry - // will just silently ignore. - let sentry = sentry::init(env::var("RCH_SENTRY_DSN").unwrap_or_else(|_| "".into())); - if sentry.is_enabled() { - info!("Sentry is successfully set up.") - } - - sentry -} - -/// If RCH_BACKEND_NAME environment variable is set, add it to the sentry -/// `server_name` properties. -/// For backwards compatibility, we also support HEROKU_APP_NAME env variable. -fn get_backend_name<'a>() -> Option> { - if let Ok(n) = env::var("RCH_BACKEND_NAME") { - return Some(n.into()); - } else if let Ok(n) = env::var("HEROKU_APP_NAME") { - return Some(n.into()); - } - - None -} - -#[derive(Debug)] -enum SentryError<'a> { - // TODO: Probably a good idea would be to `impl std:error:Error` for the - // three errors below. - Misc(&'a MiscError), - Mx(&'a MxError), - Smtp(&'a SmtpError), -} - -impl<'a> SentryError<'a> { - /// Get the error type to be passed into Sentry's Exception `ty` field. - fn get_exception_type(&self) -> String { - match self { - SentryError::Misc(_) => "MiscError".into(), - SentryError::Mx(_) => "MxError".into(), - SentryError::Smtp(_) => "SmtpError".into(), - } - } -} - -/// Helper function to send an Error event to Sentry. We redact all sensitive -/// info before sending to Sentry, by removing all instances of `username`. -fn error(err: SentryError, result: &CheckEmailOutput) { - let exception_value = redact(format!("{err:?}").as_str(), &result.syntax.username); - debug!("Sending error to Sentry: {}", exception_value); - - let exception = Exception { - ty: err.get_exception_type(), - value: Some(exception_value), - ..Default::default() - }; - - sentry::capture_event(Event { - exception: Values { - values: vec![exception], - }, - level: Level::Error, - environment: Some("production".into()), - release: Some(CARGO_PKG_VERSION.into()), - message: Some(redact( - format!("{result:#?}").as_str(), - &result.syntax.username, - )), - server_name: get_backend_name(), - transaction: Some(format!("check_email:{}", result.syntax.domain)), - ..Default::default() - }); -} - -/// Function to replace all usernames from email, and replace them with -/// `***@domain.com` for privacy reasons. -fn redact(input: &str, username: &str) -> String { - input.replace(username, "***") -} - -/// Check if the message contains known SMTP Transient errors. -fn skip_smtp_transient_errors(message: &[String]) -> bool { - let first_line = message[0].to_lowercase(); - - // 4.3.2 Please try again later - first_line.contains("try again") || - // Temporary local problem - please try later - first_line.contains("try later") -} - -/// Checks if the output from `check-if-email-exists` has a known error, in -/// which case we don't log to Sentry to avoid spamming it. -pub fn log_unknown_errors(result: &CheckEmailOutput) { - match (&result.misc, &result.mx, &result.smtp) { - (Err(err), _, _) => { - // We log all misc errors. - sentry_util::error(SentryError::Misc(err), result); - } - (_, Err(err), _) => { - // We log all mx errors. - sentry_util::error(SentryError::Mx(err), result); - } - (_, _, Err(err)) if err.get_description().is_some() => { - // If the SMTP error is known, we don't track it in Sentry. - } - (_, _, Err(SmtpError::SmtpError(AsyncSmtpError::Transient(response)))) - if skip_smtp_transient_errors(&response.message) => - { - // If the SMTP error is transient and known, we don't track it in - // Sentry, just log it locally. - let redacted = redact( - response.message[0].as_str(), - result.syntax.username.as_str(), - ); - debug!(email=?redacted, "Transient error"); - } - (_, _, Err(err)) => { - // If it's a SMTP error we didn't catch above, we log to - // Sentry, to be able to debug them better. We don't want to - // spam Sentry and log all instances of the error, hence the - // `count` check. - sentry_util::error(SentryError::Smtp(err), result); - } - // If everything is ok, we just return the result. - (Ok(_), Ok(_), Ok(_)) => {} - } -} - -#[cfg(test)] -mod tests { - use super::redact; - - #[test] - fn test_redact() { - assert_eq!("***@gmail.com", redact("someone@gmail.com", "someone")); - assert_eq!( - "my email is ***@gmail.com.", - redact("my email is someone@gmail.com.", "someone") - ); - assert_eq!( - "my email is ***@gmail.com., I repeat, my email is ***@gmail.com.", - redact( - "my email is someone@gmail.com., I repeat, my email is someone@gmail.com.", - "someone" - ) - ); - assert_eq!( - "*** @ gmail . com", - redact("someone @ gmail . com", "someone") - ); - assert_eq!("*** is here.", redact("someone is here.", "someone")); - } -}