From d9302d4c1cec6a5a1788afe2a3718df8986f118f Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1729@users.noreply.github.com> Date: Fri, 22 Dec 2023 14:03:49 +0100 Subject: [PATCH] feat(backend): Add POST /v1/bulk (#1413) * Move bulk to v0 * Add v1 bulk route * Make it compile * Make workers work * Make it work * fix tests * Add length in response --- backend/Cargo.toml | 3 +- backend/src/http/mod.rs | 38 +++-- backend/src/http/{ => v0}/bulk/db.rs | 0 backend/src/http/{ => v0}/bulk/error.rs | 0 backend/src/http/{ => v0}/bulk/get.rs | 0 backend/src/http/{ => v0}/bulk/mod.rs | 0 backend/src/http/{ => v0}/bulk/post.rs | 2 +- .../http/{ => v0}/bulk/results/csv_helper.rs | 0 backend/src/http/{ => v0}/bulk/results/mod.rs | 0 backend/src/http/{ => v0}/bulk/task.rs | 0 backend/src/http/{ => v0}/check_email/mod.rs | 0 backend/src/http/{ => v0}/check_email/post.rs | 2 +- backend/src/http/v0/mod.rs | 2 + backend/src/http/v1/bulk/error.rs | 40 +++++ backend/src/http/v1/bulk/mod.rs | 2 + backend/src/http/v1/bulk/post.rs | 148 ++++++++++++++++++ backend/src/http/v1/mod.rs | 2 + backend/src/main.rs | 25 ++- backend/src/worker/check_email.rs | 4 +- backend/src/worker/mod.rs | 28 ++-- backend/tests/check_email.rs | 14 +- 21 files changed, 276 insertions(+), 34 deletions(-) rename backend/src/http/{ => v0}/bulk/db.rs (100%) rename backend/src/http/{ => v0}/bulk/error.rs (100%) rename backend/src/http/{ => v0}/bulk/get.rs (100%) rename backend/src/http/{ => v0}/bulk/mod.rs (100%) rename backend/src/http/{ => v0}/bulk/post.rs (98%) rename backend/src/http/{ => v0}/bulk/results/csv_helper.rs (100%) rename backend/src/http/{ => v0}/bulk/results/mod.rs (100%) rename backend/src/http/{ => v0}/bulk/task.rs (100%) rename backend/src/http/{ => v0}/check_email/mod.rs (100%) rename backend/src/http/{ => v0}/check_email/post.rs (96%) create mode 100644 backend/src/http/v0/mod.rs create mode 100644 backend/src/http/v1/bulk/error.rs create mode 100644 backend/src/http/v1/bulk/mod.rs create mode 100644 backend/src/http/v1/bulk/post.rs create mode 100644 backend/src/http/v1/mod.rs diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 71f4067bd..843e91292 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -12,7 +12,7 @@ csv = "1.3.0" dotenv = "0.15.0" futures = { version = "0.3.29", optional = true } futures-lite = { version = "2.1.0", optional = true } -lapin = { version = "2.3.1", optional = true } +lapin = { version = "2.3.1" } log = "0.4" openssl = { version = "0.10.57", features = ["vendored"] } reqwest = { version = "0.11.22", features = ["json"], optional = true } @@ -40,7 +40,6 @@ warp = "0.3" worker = [ "futures", "futures-lite", - "lapin", "reqwest", "tokio-executor-trait", "tokio-reactor-trait", diff --git a/backend/src/http/mod.rs b/backend/src/http/mod.rs index 69e44bf06..337035646 100644 --- a/backend/src/http/mod.rs +++ b/backend/src/http/mod.rs @@ -14,29 +14,47 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -mod bulk; -pub mod check_email; +mod v0; +mod v1; mod version; use std::env; use std::net::IpAddr; use check_if_email_exists::LOG_TARGET; +use lapin::Channel; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use tracing::info; use warp::Filter; use super::errors; +#[cfg(feature = "worker")] pub fn create_routes( o: Option>, + channel: Option, ) -> impl Filter + Clone { version::get::get_version() - .or(check_email::post::post_check_email()) + .or(v0::check_email::post::post_check_email()) + .or(v1::bulk::post::create_bulk_job(channel)) // The 3 following routes will 404 if o is None. - .or(bulk::post::create_bulk_job(o.clone())) - .or(bulk::get::get_bulk_job_status(o.clone())) - .or(bulk::results::get_bulk_job_result(o)) + .or(v0::bulk::post::create_bulk_job(o.clone())) + .or(v0::bulk::get::get_bulk_job_status(o.clone())) + .or(v0::bulk::results::get_bulk_job_result(o)) + .recover(errors::handle_rejection) +} + +#[cfg(not(feature = "worker"))] +pub fn create_routes( + o: Option>, + _channel: Option, +) -> impl Filter + Clone { + version::get::get_version() + .or(v0::check_email::post::post_check_email()) + // The 3 following routes will 404 if o is None. + .or(v0::bulk::post::create_bulk_job(o.clone())) + .or(v0::bulk::get::get_bulk_job_status(o.clone())) + .or(v0::bulk::results::get_bulk_job_result(o)) .recover(errors::handle_rejection) } @@ -44,7 +62,9 @@ pub fn create_routes( /// /// This function starts the Warp server and listens for incoming requests. /// It returns a `Result` indicating whether the server started successfully or encountered an error. -pub async fn run_warp_server() -> Result<(), Box> { +pub async fn run_warp_server( + channel: Option, +) -> Result<(), Box> { let host = env::var("RCH_HTTP_HOST") .unwrap_or_else(|_| "127.0.0.1".into()) .parse::() @@ -59,13 +79,13 @@ pub async fn run_warp_server() -> Result<(), Box. -//! This file implements the `POST /bulk` endpoint. +//! This file implements the `POST /v0/bulk` endpoint. use check_if_email_exists::CheckEmailInputProxy; use check_if_email_exists::LOG_TARGET; diff --git a/backend/src/http/bulk/results/csv_helper.rs b/backend/src/http/v0/bulk/results/csv_helper.rs similarity index 100% rename from backend/src/http/bulk/results/csv_helper.rs rename to backend/src/http/v0/bulk/results/csv_helper.rs diff --git a/backend/src/http/bulk/results/mod.rs b/backend/src/http/v0/bulk/results/mod.rs similarity index 100% rename from backend/src/http/bulk/results/mod.rs rename to backend/src/http/v0/bulk/results/mod.rs diff --git a/backend/src/http/bulk/task.rs b/backend/src/http/v0/bulk/task.rs similarity index 100% rename from backend/src/http/bulk/task.rs rename to backend/src/http/v0/bulk/task.rs diff --git a/backend/src/http/check_email/mod.rs b/backend/src/http/v0/check_email/mod.rs similarity index 100% rename from backend/src/http/check_email/mod.rs rename to backend/src/http/v0/check_email/mod.rs diff --git a/backend/src/http/check_email/post.rs b/backend/src/http/v0/check_email/post.rs similarity index 96% rename from backend/src/http/check_email/post.rs rename to backend/src/http/v0/check_email/post.rs index 40e7f47a3..677f117c2 100644 --- a/backend/src/http/check_email/post.rs +++ b/backend/src/http/v0/check_email/post.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -//! This file implements the `POST /check_email` endpoint. +//! This file implements the `POST /v0/check_email` endpoint. use check_if_email_exists::CheckEmailInput; use check_if_email_exists::LOG_TARGET; diff --git a/backend/src/http/v0/mod.rs b/backend/src/http/v0/mod.rs new file mode 100644 index 000000000..92bd84116 --- /dev/null +++ b/backend/src/http/v0/mod.rs @@ -0,0 +1,2 @@ +pub mod bulk; +pub mod check_email; diff --git a/backend/src/http/v1/bulk/error.rs b/backend/src/http/v1/bulk/error.rs new file mode 100644 index 000000000..7269a43fa --- /dev/null +++ b/backend/src/http/v1/bulk/error.rs @@ -0,0 +1,40 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use warp::reject; + +/// Catch all error struct for the bulk endpoints +#[derive(Debug)] +pub enum BulkError { + EmptyInput, + Serde(serde_json::Error), + Lapin(lapin::Error), +} + +// Defaults to Internal server error +impl reject::Reject for BulkError {} + +impl From for BulkError { + fn from(e: serde_json::Error) -> Self { + BulkError::Serde(e) + } +} + +impl From for BulkError { + fn from(e: lapin::Error) -> Self { + BulkError::Lapin(e) + } +} diff --git a/backend/src/http/v1/bulk/mod.rs b/backend/src/http/v1/bulk/mod.rs new file mode 100644 index 000000000..245145f31 --- /dev/null +++ b/backend/src/http/v1/bulk/mod.rs @@ -0,0 +1,2 @@ +pub mod error; +pub mod post; diff --git a/backend/src/http/v1/bulk/post.rs b/backend/src/http/v1/bulk/post.rs new file mode 100644 index 000000000..027376730 --- /dev/null +++ b/backend/src/http/v1/bulk/post.rs @@ -0,0 +1,148 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! This file implements the `POST /v1/bulk` endpoint. + +use check_if_email_exists::CheckEmailInput; +use check_if_email_exists::CheckEmailInputProxy; +use check_if_email_exists::LOG_TARGET; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; +use lapin::Channel; +use lapin::{options::*, BasicProperties}; +use serde::{Deserialize, Serialize}; +use tracing::debug; +use warp::Filter; + +use super::error::BulkError; +use crate::check::check_header; +use crate::worker::check_email::CheckEmailPayload; +use crate::worker::check_email::CheckEmailWebhook; + +/// Endpoint request body. +#[derive(Debug, Deserialize)] +struct CreateBulkRequest { + input: Vec, + proxy: Option, + hello_name: Option, + from_email: Option, + webhook: Option, +} + +/// Endpoint response body. +#[derive(Clone, Debug, Deserialize, Serialize)] +struct CreateBulkResponse { + message: String, +} + +async fn create_bulk_request( + channel: Channel, + body: CreateBulkRequest, +) -> Result { + if body.input.is_empty() { + return Err(BulkError::EmptyInput.into()); + } + + let payloads = body.input.iter().map(|email| { + let mut input = CheckEmailInput::new(email.to_string()); + if let Some(from_email) = &body.from_email { + input.set_from_email(from_email.clone()); + } + if let Some(hello_name) = &body.hello_name { + input.set_hello_name(hello_name.clone()); + } + if let Some(proxy) = &body.proxy { + input.set_proxy(proxy.clone()); + } + + CheckEmailPayload { + input, + webhook: body.webhook.clone(), + } + }); + + let n = payloads.len(); + let stream = futures::stream::iter(payloads); + + stream + .map::, _>(Ok) + .try_for_each_concurrent(10, |payload| { + let channel = channel.clone(); + let properties: lapin::protocol::basic::AMQPProperties = BasicProperties::default() + .with_content_type("application/json".into()) + .with_priority(1); + + async move { + let payload_u8 = serde_json::to_vec(&payload)?; + let queue_name = "check_email.Smtp"; // TODO We might want to make this configurable. + channel + .basic_publish( + "", + queue_name, + BasicPublishOptions::default(), + &payload_u8, + properties, + ) + .await? + .await?; + + debug!(target: LOG_TARGET, email=?payload.input.to_email, queue=?queue_name, "Enqueued"); + + Ok(()) + } + }) + .await?; + + Ok(warp::reply::json(&CreateBulkResponse { + message: format!("Successfully added {n} emails to the queue"), + })) +} + +/// Create the `POST /bulk` endpoint. +/// The endpoint accepts list of email address and creates +/// a new job to check them. +pub fn create_bulk_job( + o: Option, +) -> impl Filter + Clone { + warp::path!("v1" / "bulk") + .and(warp::post()) + .and(check_header()) + .and(with_channel(o)) + // When accepting a body, we want a JSON body (and to reject huge + // payloads)... + // TODO: Configure max size limit for a bulk job + .and(warp::body::content_length_limit(1024 * 16)) + .and(warp::body::json()) + .and_then(create_bulk_request) + // View access logs by setting `RUST_LOG=reacher_backend`. + .with(warp::log(LOG_TARGET)) +} + +/// Warp filter that extracts lapin Channel. +fn with_channel( + o: Option, +) -> impl Filter + Clone { + warp::any().and_then(move || { + let o = o.clone(); + async move { + if let Some(channel) = o { + Ok(channel) + } else { + Err(warp::reject::not_found()) + } + } + }) +} diff --git a/backend/src/http/v1/mod.rs b/backend/src/http/v1/mod.rs new file mode 100644 index 000000000..e975d14a1 --- /dev/null +++ b/backend/src/http/v1/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "worker")] +pub mod bulk; diff --git a/backend/src/main.rs b/backend/src/main.rs index 9872c75e4..2270f46df 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -17,11 +17,16 @@ //! Main entry point of the `reacher_backend` binary. It has two `main` //! functions, depending on whether the `bulk` feature is enabled or not. +#[cfg(feature = "worker")] +use std::env; + use check_if_email_exists::LOG_TARGET; #[cfg(feature = "worker")] use futures::try_join; use tracing::info; +#[cfg(feature = "worker")] +use reacher_backend::worker::create_channel; #[cfg(feature = "worker")] use reacher_backend::worker::run_worker; use reacher_backend::{ @@ -41,10 +46,26 @@ async fn main() -> Result<(), Box> { // Setup sentry bug tracking. let _guard: sentry::ClientInitGuard = setup_sentry(); - let _http_server = run_warp_server(); + #[cfg(feature = "worker")] + let backend_name = env::var("RCH_BACKEND_NAME").expect("RCH_BACKEND_NAME is not set"); + + #[cfg(feature = "worker")] + let channel = { Some(create_channel(&backend_name).await?) }; + #[cfg(not(feature = "worker"))] + let channel = None; + + let _http_server = run_warp_server(channel.clone()); #[cfg(feature = "worker")] - try_join!(_http_server, run_worker())?; + try_join!( + _http_server, + run_worker( + channel.expect("If worker feature is set, channel is set."), + &backend_name + ) + )?; + #[cfg(not(feature = "worker"))] + _http_server.await?; Ok(()) } diff --git a/backend/src/worker/check_email.rs b/backend/src/worker/check_email.rs index cfd07d342..bae27021d 100644 --- a/backend/src/worker/check_email.rs +++ b/backend/src/worker/check_email.rs @@ -23,13 +23,13 @@ use tracing::{debug, info}; use crate::check::check_email; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct CheckEmailPayload { pub input: CheckEmailInput, pub webhook: Option, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct CheckEmailWebhook { pub url: String, pub extra: serde_json::Value, diff --git a/backend/src/worker/mod.rs b/backend/src/worker/mod.rs index 7ada8540a..d4e78a0dd 100644 --- a/backend/src/worker/mod.rs +++ b/backend/src/worker/mod.rs @@ -18,28 +18,25 @@ use std::env; use check_if_email_exists::LOG_TARGET; use futures_lite::StreamExt; -use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties}; +use lapin::{options::*, types::FieldTable, Channel, Connection, ConnectionProperties}; use tracing::{error, info}; -mod check_email; +pub mod check_email; use check_email::process_check_email; -pub async fn run_worker() -> Result<(), Box> { +pub async fn create_channel( + backend_name: &str, +) -> Result> { // Make sure the worker is well configured. let addr = env::var("RCH_AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672".into()); - let backend_name = env::var("RCH_BACKEND_NAME").expect("RCH_BACKEND_NAME is not set"); - let verif_method: VerifMethod = env::var("RCH_VERIF_METHOD") - .expect("RCH_VERIF_METHOD is not set") - .as_str() - .into(); let options = ConnectionProperties::default() // Use tokio executor and reactor. // At the moment the reactor is only available for unix (ref: https://github.com/amqp-rs/reactor-trait/issues/1) .with_executor(tokio_executor_trait::Tokio::current()) .with_reactor(tokio_reactor_trait::Tokio) - .with_connection_name(backend_name.clone().into()); + .with_connection_name(backend_name.into()); let conn = Connection::connect(&addr, options).await?; @@ -54,6 +51,17 @@ pub async fn run_worker() -> Result<(), Box .await?; info!(target: LOG_TARGET, backend=?backend_name,state=?conn.status().state(), concurrency=?concurrency, "Connected to AMQP broker"); + Ok(channel) +} + +pub async fn run_worker( + channel: Channel, + backend_name: &str, +) -> Result<(), Box> { + let verif_method: VerifMethod = env::var("RCH_VERIF_METHOD") + .map(|s| s.as_str().into()) + .unwrap_or(VerifMethod::Smtp); + // Create queue "check_email.{Smtp,Headless}" with priority. let queue_name = format!("check_email.{:?}", verif_method); let mut queue_args = FieldTable::default(); @@ -74,7 +82,7 @@ pub async fn run_worker() -> Result<(), Box let mut consumer = channel .basic_consume( &queue_name, - &backend_name, + backend_name, BasicConsumeOptions::default(), FieldTable::default(), ) diff --git a/backend/tests/check_email.rs b/backend/tests/check_email.rs index dbe374994..979047ce9 100644 --- a/backend/tests/check_email.rs +++ b/backend/tests/check_email.rs @@ -34,7 +34,7 @@ async fn test_input_foo_bar() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar"}"#).unwrap()) - .reply(&create_routes(None)) + .reply(&create_routes(None, None)) .await; assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp.body()); @@ -50,7 +50,7 @@ async fn test_input_foo_bar_baz() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar.baz"}"#).unwrap()) - .reply(&create_routes(None)) + .reply(&create_routes(None, None)) .await; assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp.body()); @@ -65,7 +65,7 @@ async fn test_reacher_secret_missing_header() { .path("/v0/check_email") .method("POST") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar.baz"}"#).unwrap()) - .reply(&create_routes(None)) + .reply(&create_routes(None, None)) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body()); @@ -81,7 +81,7 @@ async fn test_reacher_secret_wrong_secret() { .method("POST") .header(REACHER_SECRET_HEADER, "barbaz") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar.baz"}"#).unwrap()) - .reply(&create_routes(None)) + .reply(&create_routes(None, None)) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body()); @@ -97,7 +97,7 @@ async fn test_reacher_secret_correct_secret() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": "foo@bar"}"#).unwrap()) - .reply(&create_routes(None)) + .reply(&create_routes(None, None)) .await; assert_eq!(resp.status(), StatusCode::OK, "{:?}", resp.body()); @@ -113,7 +113,7 @@ async fn test_reacher_to_mail_empty() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{"to_email": ""}"#).unwrap()) - .reply(&create_routes(None)) + .reply(&create_routes(None, None)) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body()); @@ -129,7 +129,7 @@ async fn test_reacher_to_mail_missing() { .method("POST") .header(REACHER_SECRET_HEADER, "foobar") .json(&serde_json::from_str::(r#"{}"#).unwrap()) - .reply(&create_routes(None)) + .reply(&create_routes(None, None)) .await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST, "{:?}", resp.body());