From 34b1db8193d456eb17fae25914714ba243b14ccf Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 30 Nov 2024 15:44:59 +0100 Subject: [PATCH] refactor(backend): Remove "preprocess" queue (#1539) * Remove preprocess * Fix build * remove unused --- backend/backend_config.toml | 22 +- backend/src/config.rs | 227 ++---------------- backend/src/http/error.rs | 6 + backend/src/http/v0/check_email/post.rs | 2 +- backend/src/http/v1/bulk/post.rs | 87 ++++--- backend/src/http/v1/check_email/post.rs | 17 +- backend/src/http/v1/mod.rs | 25 -- backend/src/worker/consume.rs | 223 +++++++---------- .../src/worker/{check_email.rs => do_work.rs} | 57 ++--- backend/src/worker/mod.rs | 6 +- backend/src/worker/preprocess.rs | 88 ------- backend/src/worker/response.rs | 14 +- docker-compose.local.yaml | 14 +- docker-compose.yaml | 16 +- 14 files changed, 212 insertions(+), 592 deletions(-) rename backend/src/worker/{check_email.rs => do_work.rs} (81%) delete mode 100644 backend/src/worker/preprocess.rs diff --git a/backend/backend_config.toml b/backend/backend_config.toml index f009a5acf..b4f6fbaf9 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -95,28 +95,10 @@ enable = true # Env variable: RCH__WORKER__RABBITMQ__URL url = "amqp://guest:guest@localhost:5672" -# Queues to consume emails from. By default, the worker consumes from all -# queues. -# -# To consume from only a subset of queues, uncomment the line `queues = "all"` -# and specify the queues you want to consume from. -# -# Below is the exhaustive list of queue names that the worker can consume from: -# - "check.gmail": subscribe exclusively to Gmail emails. -# - "check.hotmailb2b": subscribe exclusively to Hotmail B2B emails. -# - "check.hotmailb2c": subscribe exclusively to Hotmail B2C emails. -# - "check.yahoo": subscribe exclusively to Yahoo emails. -# - "check.everything_else": subscribe to all emails that are not Gmail, Yahoo, or Hotmail. -# -# Env variable: RCH__WORKER__RABBITMQ__QUEUES -# -# queues = ["check.gmail", "check.hotmailb2b", "check.hotmailb2c", "check.yahoo", "check.everything_else"] -queues = "all" - -# Number of concurrent emails to verify for this worker across all queues. +# Number of concurrent emails to verify for this worker. # # Env variable: RCH__WORKER__RABBITMQ__CONCURRENCY -concurrency = 20 +concurrency = 5 # Throttle the maximum number of requests per second, per minute, per hour, and # per day for this worker. diff --git a/backend/src/config.rs b/backend/src/config.rs index a9031284a..c1edc8228 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -16,7 +16,7 @@ use crate::create_db; #[cfg(feature = "worker")] -use crate::worker::check_email::TaskWebhook; +use crate::worker::do_work::TaskWebhook; #[cfg(feature = "worker")] use crate::worker::setup_rabbit_mq; use anyhow::bail; @@ -27,12 +27,11 @@ use check_if_email_exists::{ use config::Config; #[cfg(feature = "worker")] use lapin::Channel; -use serde::de::{self, Deserializer, Visitor}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use std::env; #[cfg(feature = "worker")] use std::sync::Arc; -use std::{env, fmt}; use tracing::warn; #[derive(Debug, Default, Serialize, Deserialize)] @@ -70,10 +69,7 @@ pub struct BackendConfig { pg_pool: Option, #[cfg(feature = "worker")] #[serde(skip)] - check_email_channel: Option>, - #[cfg(feature = "worker")] - #[serde(skip)] - preprocess_channel: Option>, + channel: Option>, } impl BackendConfig { @@ -90,9 +86,7 @@ impl BackendConfig { &self.worker.postgres, &self.pg_pool, #[cfg(feature = "worker")] - &self.check_email_channel, - #[cfg(feature = "worker")] - &self.preprocess_channel, + &self.channel, ) { #[cfg(feature = "worker")] ( @@ -101,14 +95,11 @@ impl BackendConfig { Some(rabbitmq), Some(postgres), Some(pg_pool), - Some(check_email_channel), - Some(preprocess_channel), + Some(channel), ) => Ok(MustWorkerConfig { pg_pool: pg_pool.clone(), #[cfg(feature = "worker")] - check_email_channel: check_email_channel.clone(), - #[cfg(feature = "worker")] - preprocess_channel: preprocess_channel.clone(), + channel: channel.clone(), throttle: throttle.clone(), rabbitmq: rabbitmq.clone(), #[cfg(feature = "worker")] @@ -116,14 +107,13 @@ impl BackendConfig { postgres: postgres.clone(), }), #[cfg(feature = "worker")] - (true, _, _, _, _, _, _) => bail!("Worker configuration is missing"), + (true, _, _, _, _, _) => bail!("Worker configuration is missing"), _ => bail!("Calling must_worker_config on a non-worker backend"), } } /// Attempt connection to the Postgres database and RabbitMQ. Also populates - /// the internal `pg_pool1, `check_email_channel` and `preprocess_channel` - /// fields with the connections. + /// the internal `pg_pool` and `channel` fields with the connections. pub async fn connect(&mut self) -> Result<(), anyhow::Error> { let pg_pool = if self.worker.enable { let db_url = self @@ -145,21 +135,16 @@ impl BackendConfig { #[cfg(feature = "worker")] { - let (check_email_channel, preprocess_channel) = if self.worker.enable { + let channel = if self.worker.enable { let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| { anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration") })?; - let (check_email_channel, preprocess_channel) = - setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?; - ( - Some(Arc::new(check_email_channel)), - Some(Arc::new(preprocess_channel)), - ) + let channel = setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?; + Some(Arc::new(channel)) } else { - (None, None) + None }; - self.check_email_channel = check_email_channel; - self.preprocess_channel = preprocess_channel; + self.channel = channel; } Ok(()) @@ -168,16 +153,6 @@ impl BackendConfig { pub fn get_pg_pool(&self) -> Option { self.pg_pool.clone() } - - #[cfg(feature = "worker")] - pub fn get_check_email_channel(&self) -> Option> { - self.check_email_channel.clone() - } - - #[cfg(feature = "worker")] - pub fn get_preprocess_channel(&self) -> Option> { - self.preprocess_channel.clone() - } } #[derive(Debug, Default, Deserialize, Clone, Serialize)] @@ -213,9 +188,7 @@ pub struct WorkerConfig { pub struct MustWorkerConfig { pub pg_pool: PgPool, #[cfg(feature = "worker")] - pub check_email_channel: Arc, - #[cfg(feature = "worker")] - pub preprocess_channel: Arc, + pub channel: Arc, pub throttle: ThrottleConfig, pub rabbitmq: RabbitMQConfig, @@ -224,162 +197,13 @@ pub struct MustWorkerConfig { pub postgres: PostgresConfig, } -#[derive(Debug, Clone, Serialize)] -pub enum RabbitMQQueues { - All, - Only(Vec), -} - -/// Deserialize RabbitMQQueues from a string or a list of strings. -/// If the value is "all", then we return RabbitMQQueues::All. -/// If the value is a list of strings, then we return RabbitMQQueues::Only. -impl<'de> Deserialize<'de> for RabbitMQQueues { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct RabbitMQQueuesVisitor; - - impl<'de> Visitor<'de> for RabbitMQQueuesVisitor { - type Value = RabbitMQQueues; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a string 'all' or a list of queue strings") - } - - fn visit_str(self, value: &str) -> Result - where - E: de::Error, - { - if value == "all" { - Ok(RabbitMQQueues::All) - } else { - Err(de::Error::unknown_variant(value, &["all"])) - } - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'de>, - { - let mut queues = Vec::new(); - while let Some(value) = seq.next_element()? { - queues.push(value); - } - Ok(RabbitMQQueues::Only(queues)) - } - } - - deserializer.deserialize_any(RabbitMQQueuesVisitor) - } -} - -impl RabbitMQQueues { - pub fn to_queues(&self) -> Vec { - match self { - RabbitMQQueues::All => vec![ - Queue::Gmail, - Queue::HotmailB2B, - Queue::HotmailB2C, - Queue::Yahoo, - Queue::EverythingElse, - ], - RabbitMQQueues::Only(queues) => queues.clone(), - } - } -} - #[derive(Debug, Deserialize, Clone, Serialize)] pub struct RabbitMQConfig { pub url: String, - /// Queues to consume emails from. By default the worker consumes from all - /// queues. - /// - /// If you wish to consume from only a subset of queues, you can uncomment - /// the line `queues = "all"`, and then specify the queues you want to - /// consume from. - /// - /// Below is the exhaustive list of queue names that the worker can consume from: - /// - "check.gmail": subcribe exclusively to Gmail emails. - /// - "check.hotmailb2b": subcribe exclusively to Hotmail B2B emails. - /// - "check.hotmailb2c": subcribe exclusively to Hotmail B2C emails. - /// - "check.yahoo": subcribe exclusively to Yahoo emails. - /// - "check.everything_else": subcribe to all emails that are not Gmail, Yahoo, or Hotmail. - /// - /// queues = ["check.gmail", "check.hotmailb2b", "check.hotmailb2c", "check.yahoo", "check.everything_else"] - pub queues: RabbitMQQueues, - /// Total number of concurrent messages that the worker can process, across - /// all queues. + /// Total number of concurrent messages that the worker can process. pub concurrency: u16, } -/// Queue names that the worker can consume from. Each email is routed to a -/// one and only one queue, based on the email provider. A single worker can -/// consume from multiple queues. -#[derive(Debug, Clone, PartialEq, Serialize)] -pub enum Queue { - Gmail, - HotmailB2B, - HotmailB2C, - Yahoo, - EverythingElse, -} - -impl fmt::Display for Queue { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Queue::Gmail => write!(f, "check.gmail"), - Queue::HotmailB2B => write!(f, "check.hotmailb2b"), - Queue::HotmailB2C => write!(f, "check.hotmailb2c"), - Queue::Yahoo => write!(f, "check.yahoo"), - Queue::EverythingElse => write!(f, "check.everything_else"), - } - } -} - -// Implement Deserialize for the Queue enum -impl<'de> Deserialize<'de> for Queue { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct QueueVisitor; - - impl<'de> Visitor<'de> for QueueVisitor { - type Value = Queue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a valid queue string") - } - - fn visit_str(self, value: &str) -> Result - where - E: de::Error, - { - match value { - "check.gmail" => Ok(Queue::Gmail), - "check.hotmailb2b" => Ok(Queue::HotmailB2B), - "check.hotmailb2c" => Ok(Queue::HotmailB2C), - "check.yahoo" => Ok(Queue::Yahoo), - "check.everything_else" => Ok(Queue::EverythingElse), - _ => Err(de::Error::unknown_variant( - value, - &[ - "check.gmail", - "check.hotmailb2b", - "check.hotmailb2c", - "check.yahoo", - "check.everything_else", - ], - )), - } - } - } - - deserializer.deserialize_str(QueueVisitor) - } -} - #[derive(Debug, Deserialize, Clone, Serialize)] pub struct PostgresConfig { pub db_url: String, @@ -408,19 +232,10 @@ impl ThrottleConfig { /// Load the worker configuration from the worker_config.toml file and from the /// environment. pub async fn load_config() -> Result { - let mut cfg = Config::builder() + let cfg = Config::builder() .add_source(config::File::with_name("backend_config")) .add_source(config::Environment::with_prefix("RCH").separator("__")); - // The RCH__WORKER__RABBITMQ__QUEUES is always read as a str, whereas we - // sometimes want to parse it as a list of strings, if it's not "all". We - // handle this case separately. - if let Ok(queues) = env::var("RCH__WORKER__RABBITMQ__QUEUES") { - if queues != "all" { - let queues: Vec = queues.split(',').map(String::from).collect(); - cfg = cfg.set_override("worker.rabbitmq.queues", queues)?; - } - } let cfg = cfg.build()?.try_deserialize::()?; if !cfg.worker.enable && (cfg.worker.rabbitmq.is_some() || cfg.worker.throttle.is_some()) { @@ -432,21 +247,13 @@ pub async fn load_config() -> Result { #[cfg(test)] mod tests { - use super::{load_config, Queue}; + use super::load_config; use std::env; #[tokio::test] async fn test_env_vars() { env::set_var("RCH__BACKEND_NAME", "test-backend"); - env::set_var( - "RCH__WORKER__RABBITMQ__QUEUES", - "check.gmail,check.hotmailb2b", - ); let cfg = load_config().await.unwrap(); assert_eq!(cfg.backend_name, "test-backend"); - assert_eq!( - cfg.worker.rabbitmq.unwrap().queues.to_queues(), - vec![Queue::Gmail, Queue::HotmailB2B] - ); } } diff --git a/backend/src/http/error.rs b/backend/src/http/error.rs index 207e7daee..ac4c01278 100644 --- a/backend/src/http/error.rs +++ b/backend/src/http/error.rs @@ -108,6 +108,12 @@ impl From for ReacherResponseError { } } +impl From for ReacherResponseError { + fn from(e: anyhow::Error) -> Self { + ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) + } +} + /// This function receives a `Rejection` and tries to return a custom value, /// otherwise simply passes the rejection along. pub async fn handle_rejection(err: warp::Rejection) -> Result { diff --git a/backend/src/http/v0/check_email/post.rs b/backend/src/http/v0/check_email/post.rs index 9f52d352d..7f774b0d7 100644 --- a/backend/src/http/v0/check_email/post.rs +++ b/backend/src/http/v0/check_email/post.rs @@ -29,7 +29,7 @@ use crate::config::BackendConfig; use crate::http::{check_header, ReacherResponseError}; /// The request body for the `POST /v0/check_email` endpoint. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize)] pub struct CheckEmailRequest { pub to_email: String, pub from_email: Option, diff --git a/backend/src/http/v1/bulk/post.rs b/backend/src/http/v1/bulk/post.rs index 3b34ac432..abe05e4a0 100644 --- a/backend/src/http/v1/bulk/post.rs +++ b/backend/src/http/v1/bulk/post.rs @@ -25,20 +25,20 @@ use lapin::Channel; use lapin::{options::*, BasicProperties}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use tracing::info; +use tracing::{debug, info}; use warp::http::StatusCode; use warp::Filter; use crate::config::BackendConfig; use crate::http::check_header; -use crate::http::v1::with_channel; +use crate::http::v0::check_email::post::with_config; use crate::http::with_db; use crate::http::CheckEmailRequest; use crate::http::ReacherResponseError; -use crate::worker::check_email::TaskWebhook; -use crate::worker::preprocess::PreprocessTask; - -const PREPROCESS_QUEUE: &str = "preprocess"; +use crate::worker::consume::CHECK_EMAIL_QUEUE; +use crate::worker::do_work::CheckEmailJobId; +use crate::worker::do_work::CheckEmailTask; +use crate::worker::do_work::TaskWebhook; /// POST v1/bulk endpoint request body. #[derive(Debug, Deserialize)] @@ -54,7 +54,7 @@ struct Response { } async fn http_handler( - channel: Arc, + config: Arc, pg_pool: PgPool, body: Request, ) -> Result { @@ -75,64 +75,63 @@ async fn http_handler( .await .map_err(ReacherResponseError::from)?; - let payloads = body.input.iter().map(|email| { - let input = CheckEmailRequest { - to_email: email.clone(), - from_email: None, - hello_name: None, - proxy: None, - gmail_verif_method: None, - hotmailb2b_verif_method: None, - hotmailb2c_verif_method: None, - yahoo_verif_method: None, - smtp_port: None, - }; - - Ok(PreprocessTask { - input, - job_id: Some(rec.id), - webhook: body.webhook.clone(), - }) - }); - let payloads = payloads.collect::, ReacherResponseError>>()?; - - let n = payloads.len(); - let stream = futures::stream::iter(payloads); + let n = body.input.len(); + let webhook = body.webhook.clone(); + let stream = futures::stream::iter(body.input.into_iter()); let properties = BasicProperties::default() .with_content_type("application/json".into()) - .with_priority(1); + .with_priority(1); // Low priority stream .map::, _>(Ok) - .try_for_each_concurrent(10, |payload| async { - publish_task(Arc::clone(&channel), payload, properties.clone()).await + // Publish tasks to the queue, 10 at a time. + .try_for_each_concurrent(10, |to_email| async { + let input = CheckEmailRequest { + to_email, + ..Default::default() + } + .to_check_email_input(Arc::clone(&config)); + + let task = CheckEmailTask { + input, + job_id: CheckEmailJobId::Bulk(rec.id), + webhook: webhook.clone(), + }; + + publish_task( + config + .must_worker_config() + .map_err(ReacherResponseError::from)? + .channel, + task, + properties.clone(), + ) + .await }) .await?; info!( target: LOG_TARGET, - queue = PREPROCESS_QUEUE, - "Added {n} emails to the queue", + queue = CHECK_EMAIL_QUEUE, + "Added {n} emails", ); Ok(warp::reply::json(&Response { job_id: rec.id })) } -/// Publish a task to the "preprocess" queue. +/// Publish a task to the "check_email" queue. pub async fn publish_task( channel: Arc, - payload: PreprocessTask, + task: CheckEmailTask, properties: BasicProperties, ) -> Result<(), ReacherResponseError> { - let channel = Arc::clone(&channel); - - let payload_u8 = serde_json::to_vec(&payload)?; + let task_json = serde_json::to_vec(&task)?; channel .basic_publish( "", - PREPROCESS_QUEUE, + CHECK_EMAIL_QUEUE, BasicPublishOptions::default(), - &payload_u8, + &task_json, properties, ) .await @@ -140,7 +139,7 @@ pub async fn publish_task( .await .map_err(ReacherResponseError::from)?; - info!(target: LOG_TARGET, email=?payload.input.to_email, queue=?PREPROCESS_QUEUE, "Published task"); + debug!(target: LOG_TARGET, email=?task.input.to_email, queue=?CHECK_EMAIL_QUEUE, "Published task"); Ok(()) } @@ -154,7 +153,7 @@ pub fn v1_create_bulk_job( warp::path!("v1" / "bulk") .and(warp::post()) .and(check_header(Arc::clone(&config))) - .and(with_channel(config.get_preprocess_channel())) + .and(with_config(Arc::clone(&config))) .and(with_db(config.get_pg_pool())) // When accepting a body, we want a JSON body (and to reject huge // payloads)... diff --git a/backend/src/http/v1/check_email/post.rs b/backend/src/http/v1/check_email/post.rs index b828d2aed..97c383ea8 100644 --- a/backend/src/http/v1/check_email/post.rs +++ b/backend/src/http/v1/check_email/post.rs @@ -22,7 +22,7 @@ use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicRejectOptions, QueueDeclareOptions, }; use lapin::types::FieldTable; -use lapin::{BasicProperties, Channel}; +use lapin::BasicProperties; use std::sync::Arc; use warp::http::StatusCode; use warp::{http, Filter}; @@ -30,16 +30,14 @@ use warp::{http, Filter}; use crate::config::BackendConfig; use crate::http::v0::check_email::post::{with_config, CheckEmailRequest}; use crate::http::v1::bulk::post::publish_task; -use crate::http::v1::with_channel; use crate::http::{check_header, ReacherResponseError}; use crate::worker::consume::MAX_QUEUE_PRIORITY; -use crate::worker::preprocess::PreprocessTask; +use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask}; use crate::worker::response::SingleShotReply; /// The main endpoint handler that implements the logic of this route. async fn http_handler( config: Arc, - channel: Arc, body: CheckEmailRequest, ) -> Result { // The to_email field must be present @@ -61,6 +59,10 @@ async fn http_handler( "application/json", )); } + let channel = config + .must_worker_config() + .map_err(ReacherResponseError::from)? + .channel; // Follow this RPC tutorial: // https://www.rabbitmq.com/tutorials/tutorial-six-javascript#callback-queue @@ -87,9 +89,9 @@ async fn http_handler( publish_task( channel.clone(), - PreprocessTask { - input: body, - job_id: None, + CheckEmailTask { + input: body.to_check_email_input(config), + job_id: CheckEmailJobId::SingleShot, webhook: None, }, properties, @@ -169,7 +171,6 @@ pub fn v1_check_email( .and(warp::post()) .and(check_header(Arc::clone(&config))) .and(with_config(config.clone())) - .and(with_channel(config.get_preprocess_channel())) // When accepting a body, we want a JSON body (and to reject huge // payloads)... .and(warp::body::content_length_limit(1024 * 16)) diff --git a/backend/src/http/v1/mod.rs b/backend/src/http/v1/mod.rs index 3fdfe8dfb..7d4bf4f95 100644 --- a/backend/src/http/v1/mod.rs +++ b/backend/src/http/v1/mod.rs @@ -14,30 +14,5 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use lapin::Channel; -use std::sync::Arc; -use warp::http::StatusCode; -use warp::Filter; - -use super::ReacherResponseError; - pub mod bulk; pub mod check_email; - -/// Warp filter that extracts lapin Channel, or returns a 503 error if it's not -/// available. -pub fn with_channel( - channel: Option>, -) -> impl Filter,), Error = warp::Rejection> + Clone { - warp::any().and_then(move || { - let channel = channel.clone(); - async move { - channel.ok_or_else(|| { - warp::reject::custom(ReacherResponseError::new( - StatusCode::SERVICE_UNAVAILABLE, - "Please configure a RabbitMQ instance on Reacher before calling this endpoint", - )) - }) - } - }) -} diff --git a/backend/src/worker/consume.rs b/backend/src/worker/consume.rs index ca71280b7..5dc0d5b78 100644 --- a/backend/src/worker/consume.rs +++ b/backend/src/worker/consume.rs @@ -14,10 +14,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use super::check_email::{do_check_email_work, CheckEmailTask, TaskError}; -use super::preprocess::{do_preprocess_work, PreprocessTask}; +use super::do_work::{do_check_email_work, CheckEmailTask, TaskError}; use super::response::send_single_shot_reply; -use crate::config::{BackendConfig, RabbitMQConfig, RabbitMQQueues, ThrottleConfig}; +use crate::config::{BackendConfig, RabbitMQConfig, ThrottleConfig}; +use crate::worker::do_work::CheckEmailJobId; use anyhow::Context; use check_if_email_exists::LOG_TARGET; use futures::stream::StreamExt; @@ -27,21 +27,20 @@ use std::time::{Duration, Instant}; use tokio::sync::Mutex; use tracing::{debug, error, info}; +/// Our RabbitMQ only has one queue: "check_email". +pub const CHECK_EMAIL_QUEUE: &str = "check_email"; pub const MAX_QUEUE_PRIORITY: u8 = 5; -/// Set up the RabbitMQ connection and declare all queues. This creates two -/// channels, one for checking emails and one for preprocessing. The -/// preprocessing channel is used to figure out the email provider and route -/// the message to the correct queue. +/// Set up the RabbitMQ connection and declare the "check_email" queue. /// -/// The check channel is used to consume messages from the queues. It has a +/// The check channel is used to consume messages from the queue. It has a /// global prefetch limit set to the concurrency limit. /// -/// Returns a tuple of (check_channel, preprocess_channel). +/// Returns a the lapin Channel. pub async fn setup_rabbit_mq( backend_name: &str, config: &RabbitMQConfig, -) -> Result<(Channel, Channel), anyhow::Error> { +) -> Result { let options = ConnectionProperties::default() // Use tokio executor and reactor. .with_executor(tokio_executor_trait::Tokio::current()) @@ -51,185 +50,129 @@ pub async fn setup_rabbit_mq( let conn = Connection::connect(&config.url, options) .await .with_context(|| format!("Connecting to rabbitmq {}", &config.url))?; - let check_channel = conn.create_channel().await?; - let preprocess_channel = conn.create_channel().await?; + let channel = conn.create_channel().await?; info!(target: LOG_TARGET, backend=?backend_name,state=?conn.status().state(), "Connected to AMQP broker"); let mut queue_args = FieldTable::default(); queue_args.insert("x-max-priority".into(), MAX_QUEUE_PRIORITY.into()); - // Assert all queues are declared. - for queue in RabbitMQQueues::All.to_queues().iter() { - check_channel - .queue_declare( - format!("{}", queue).as_str(), - QueueDeclareOptions { - durable: true, - ..Default::default() - }, - queue_args.clone(), - ) - .await?; - } + // Assert all queue is declared. + channel + .queue_declare( + CHECK_EMAIL_QUEUE, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + queue_args.clone(), + ) + .await?; // Set up prefetch (concurrency) limit using qos - check_channel + channel .basic_qos( config.concurrency, - // Set global to true to apply to all consumers. + // Set global to true to apply to all consumers, even though in our + // case there's only one consumer. // ref: https://www.rabbitmq.com/docs/consumer-prefetch#independent-consumers BasicQosOptions { global: true }, ) .await?; - preprocess_channel - .queue_declare( - "preprocess", - QueueDeclareOptions { - durable: true, - ..Default::default() - }, - queue_args, - ) - .await?; - - info!(target: LOG_TARGET, queues=?config.queues.to_queues(), concurrency=?config.concurrency, "Worker will start consuming messages"); + info!(target: LOG_TARGET, queues=?CHECK_EMAIL_QUEUE, concurrency=?config.concurrency, "Worker will start consuming messages"); - Ok((check_channel, preprocess_channel)) + Ok(channel) } /// Start the worker to consume messages from the queue. pub async fn run_worker(config: Arc) -> Result<(), anyhow::Error> { - tokio::try_join!( - consume_preprocess(Arc::clone(&config)), - consume_check_email(config) - )?; - - Ok(()) -} - -/// Consume "Preprocess" queue, by figuring out the email provider and routing -/// (i.e. re-publishing) to the correct queue. -async fn consume_preprocess(config: Arc) -> Result<(), anyhow::Error> { - let preprocess_channel = config.must_worker_config()?.preprocess_channel; - let check_email_channel = config.must_worker_config()?.check_email_channel; - - let mut consumer = preprocess_channel - .basic_consume( - "preprocess", - format!("{}-preprocess", &config.backend_name).as_str(), - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await?; - - // Loop over the incoming messages - while let Some(delivery) = consumer.next().await { - let delivery = delivery?; - let payload = serde_json::from_slice::(&delivery.data)?; - debug!(target: LOG_TARGET, email=payload.input.to_email, "New Preprocess job"); - - let channel_clone = Arc::clone(&check_email_channel); - let config_clone = Arc::clone(&config); - - tokio::spawn(async move { - if let Err(e) = - do_preprocess_work(&payload, delivery, channel_clone, config_clone).await - { - error!(target: LOG_TARGET, email=payload.input.to_email, error=?e, "Error preprocessing message"); - } - }); - } - Ok(()) + consume_check_email(config).await } +/// Consume "check_email" queue. async fn consume_check_email(config: Arc) -> Result<(), anyhow::Error> { let config_clone = Arc::clone(&config); let worker_config = config_clone.must_worker_config()?; - let channel = worker_config.check_email_channel; + let channel = worker_config.channel; let throttle = Arc::new(Mutex::new(Throttle::new())); - for queue in worker_config.rabbitmq.queues.to_queues() { - let channel_clone = Arc::clone(&channel); - let config_clone = Arc::clone(&config); - let throttle_clone = Arc::clone(&throttle); - let queue_clone = queue.clone(); - - tokio::spawn(async move { - let worker_config = config_clone.must_worker_config()?; - - let mut consumer = channel_clone - .basic_consume( - queue_clone.to_string().as_str(), - format!("{}-{}", &config_clone.backend_name, &queue_clone).as_str(), - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await?; - - // Loop over the incoming messages - while let Some(delivery) = consumer.next().await { - let delivery = delivery?; - let payload = serde_json::from_slice::(&delivery.data)?; - debug!(target: LOG_TARGET, queue=?queue_clone.to_string(), email=?payload.input.to_email, "Consuming message"); - - // Reset throttle counters if needed - throttle_clone.lock().await.reset_if_needed(); - - // Check if we should throttle before fetching the next message - if let Some(wait_duration) = throttle_clone - .lock() - .await - .should_throttle(&worker_config.throttle) - { - info!(target: LOG_TARGET, wait=?wait_duration, email=?payload.input.to_email, "Too many requests, throttling"); + tokio::spawn(async move { + let worker_config = config_clone.must_worker_config()?; + + let mut consumer = channel + .basic_consume( + CHECK_EMAIL_QUEUE, + format!("{}-{}", &config_clone.backend_name, CHECK_EMAIL_QUEUE).as_str(), + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + // Loop over the incoming messages + while let Some(delivery) = consumer.next().await { + let delivery = delivery?; + let payload = serde_json::from_slice::(&delivery.data)?; + debug!(target: LOG_TARGET, email=?payload.input.to_email, "Consuming message"); + + // Reset throttle counters if needed + throttle.lock().await.reset_if_needed(); - // For single-shot tasks, we return an error early, so that the user knows they need to retry. - if payload.is_single_shot() { - debug!(target: LOG_TARGET, email=payload.input.to_email, job_id=?payload.job_id, queue=?queue_clone.to_string(), "Rejecting single-shot email because of throttling"); + // Check if we should throttle before fetching the next message + if let Some(wait_duration) = throttle + .lock() + .await + .should_throttle(&worker_config.throttle) + { + info!(target: LOG_TARGET, wait=?wait_duration, email=?payload.input.to_email, "Too many requests, throttling"); + + // For single-shot tasks, we return an error early, so that the user knows they need to retry. + match payload.job_id { + CheckEmailJobId::SingleShot => { + debug!(target: LOG_TARGET, email=payload.input.to_email, job_id=?payload.job_id, "Rejecting single-shot email because of throttling"); delivery .reject(BasicRejectOptions { requeue: false }) .await?; send_single_shot_reply( - Arc::clone(&channel_clone), + Arc::clone(&channel), &delivery, &Err(TaskError::Throttle(wait_duration)), ) .await?; - } else { + } + CheckEmailJobId::Bulk(_) => { // Put back the message into the same queue, so that other // workers can pick it up. delivery .reject(BasicRejectOptions { requeue: true }) .await?; - debug!(target: LOG_TARGET, email=payload.input.to_email, job_id=?payload.job_id, queue=?queue_clone.to_string(), "Requeued message because of throttling"); + debug!(target: LOG_TARGET, email=payload.input.to_email, job_id=?payload.job_id, "Requeued message because of throttling"); } - - continue; } - let config_clone2 = Arc::clone(&config_clone); - let channel_clone2 = Arc::clone(&channel_clone); + continue; + } - info!(target: LOG_TARGET, email=payload.input.to_email, job_id=?payload.job_id, queue=?queue_clone.to_string(), "Starting task"); - tokio::spawn(async move { - if let Err(e) = - do_check_email_work(&payload, delivery, channel_clone2, config_clone2).await - { - error!(target: LOG_TARGET, email=payload.input.to_email, error=?e, "Error processing message"); - } - }); + let config_clone2 = Arc::clone(&config_clone); + let channel_clone2 = Arc::clone(&channel); - // Increment throttle counters once we spawn the task - throttle_clone.lock().await.increment_counters(); - } + info!(target: LOG_TARGET, email=payload.input.to_email, job_id=?payload.job_id, "Starting task"); + tokio::spawn(async move { + if let Err(e) = + do_check_email_work(&payload, delivery, channel_clone2, config_clone2).await + { + error!(target: LOG_TARGET, email=payload.input.to_email, error=?e, "Error processing message"); + } + }); - Ok::<(), anyhow::Error>(()) - }); - } + // Increment throttle counters once we spawn the task + throttle.lock().await.increment_counters(); + } + + Ok::<(), anyhow::Error>(()) + }); Ok(()) } diff --git a/backend/src/worker/check_email.rs b/backend/src/worker/do_work.rs similarity index 81% rename from backend/src/worker/check_email.rs rename to backend/src/worker/do_work.rs index 40c2be4ab..5cc99b078 100644 --- a/backend/src/worker/check_email.rs +++ b/backend/src/worker/do_work.rs @@ -33,17 +33,16 @@ use warp::http::StatusCode; #[derive(Debug, Deserialize, Serialize)] pub struct CheckEmailTask { pub input: CheckEmailInput, - // If the task is a part of a job, then this field will be set. - pub job_id: Option, + pub job_id: CheckEmailJobId, pub webhook: Option, } -impl CheckEmailTask { - /// Returns true if the task is a single-shot email verification via the - /// /v1/check_email, endpoint, i.e. not a part of a bulk verification job. - pub fn is_single_shot(&self) -> bool { - self.job_id.is_none() - } +#[derive(Debug, Deserialize, Serialize)] +pub enum CheckEmailJobId { + /// Single-shot email verification, they won't have an actual job id. + SingleShot, + /// Job id of the bulk verification. + Bulk(i32), } /// The errors that can occur when processing a task. @@ -112,12 +111,12 @@ struct WebhookOutput<'a> { /// Processes the check email task asynchronously. pub(crate) async fn do_check_email_work( - payload: &CheckEmailTask, + task: &CheckEmailTask, delivery: Delivery, channel: Arc, config: Arc, ) -> Result<(), anyhow::Error> { - let worker_output = inner_check_email(payload).await; + let worker_output = inner_check_email(task).await; match (&worker_output, delivery.redelivered) { (Ok(output), false) if output.is_reachable == Reachable::Unknown => { @@ -127,14 +126,14 @@ pub(crate) async fn do_check_email_work( delivery .reject(BasicRejectOptions { requeue: true }) .await?; - info!(target: LOG_TARGET, email=?&payload.input.to_email, is_reachable=?Reachable::Unknown, "Requeued message"); + info!(target: LOG_TARGET, email=?&task.input.to_email, is_reachable=?Reachable::Unknown, "Requeued message"); } (Err(e), false) => { // Same as above, if processing the message failed, we requeue it. delivery .reject(BasicRejectOptions { requeue: true }) .await?; - info!(target: LOG_TARGET, email=?&payload.input.to_email, err=?e, "Requeued message"); + info!(target: LOG_TARGET, email=?&task.input.to_email, err=?e, "Requeued message"); } _ => { // This is the happy path. We acknowledge the message and: @@ -142,22 +141,26 @@ pub(crate) async fn do_check_email_work( // - If it's a bulk verification, we save the result to the database. delivery.ack(BasicAckOptions::default()).await?; - if payload.is_single_shot() { - send_single_shot_reply(channel, &delivery, &worker_output).await?; - } else { - save_to_db( - &config.backend_name, - config.get_pg_pool(), - payload, - &worker_output, - ) - .await?; + match task.job_id { + CheckEmailJobId::SingleShot => { + send_single_shot_reply(channel, &delivery, &worker_output).await?; + } + CheckEmailJobId::Bulk(bulk_job_id) => { + save_to_db( + &config.backend_name, + config.get_pg_pool(), + task, + bulk_job_id, + &worker_output, + ) + .await?; + } } info!(target: LOG_TARGET, - email=payload.input.to_email, + email=task.input.to_email, worker_output=?worker_output.map(|o| o.is_reachable), - job_id=?payload.job_id, + job_id=?task.job_id, "Done check", ); } @@ -166,13 +169,13 @@ pub(crate) async fn do_check_email_work( Ok(()) } -async fn inner_check_email(payload: &CheckEmailTask) -> Result { - let output = check_email(&payload.input).await; +async fn inner_check_email(task: &CheckEmailTask) -> Result { + let output = check_email(&task.input).await; // Check if we have a webhook to send the output to. if let Some(TaskWebhook { on_each_email: Some(webhook), - }) = &payload.webhook + }) = &task.webhook { let webhook_output = WebhookOutput { result: &output, diff --git a/backend/src/worker/mod.rs b/backend/src/worker/mod.rs index cc6ad6541..90b474c33 100644 --- a/backend/src/worker/mod.rs +++ b/backend/src/worker/mod.rs @@ -17,13 +17,11 @@ // Each file corresponds to one step in the worker pipeline. The worker pipeline // is defined as: // - consume from RabbitMQ -// - preprocess -// - check email +// - do the work (i.e. check the email) // - send response (either to the reply_to queue or save to the database) -pub mod check_email; pub mod consume; -pub mod preprocess; +pub mod do_work; pub mod response; pub use consume::{run_worker, setup_rabbit_mq}; diff --git a/backend/src/worker/preprocess.rs b/backend/src/worker/preprocess.rs deleted file mode 100644 index 937b0b75e..000000000 --- a/backend/src/worker/preprocess.rs +++ /dev/null @@ -1,88 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use super::check_email::CheckEmailTask; -use super::check_email::TaskWebhook; -use crate::config::{BackendConfig, Queue}; -use crate::http::CheckEmailRequest; -use anyhow::anyhow; -use check_if_email_exists::mx::check_mx; -use check_if_email_exists::syntax::check_syntax; -use check_if_email_exists::{is_gmail, is_hotmail_b2b, is_hotmail_b2c, is_yahoo, LOG_TARGET}; -use lapin::message::Delivery; -use lapin::{options::*, Channel}; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use std::sync::Arc; -use tracing::debug; - -#[derive(Debug, Deserialize, Serialize)] -pub struct PreprocessTask { - pub input: CheckEmailRequest, - // If the task is a part of a job, then this field will be set. - pub job_id: Option, - pub webhook: Option, -} - -/// Preprocess the email and send it to the appropriate queue for verification. -pub async fn do_preprocess_work( - payload: &PreprocessTask, - delivery: Delivery, - channel: Arc, - config: Arc, -) -> Result<(), anyhow::Error> { - let syntax = check_syntax(&payload.input.to_email); - let mx = check_mx(&syntax).await?; - // Get first hostname from MX records. - let mx_hostname = mx - .lookup? - .iter() - .next() - .ok_or_else(|| anyhow!("No MX records found"))? - .exchange() - .to_string(); - - let queue = match mx_hostname.as_str() { - hostname if is_gmail(hostname) => Queue::Gmail, - hostname if is_hotmail_b2b(hostname) => Queue::HotmailB2B, - hostname if is_hotmail_b2c(hostname) => Queue::HotmailB2C, - hostname if is_yahoo(hostname) => Queue::Yahoo, - _ => Queue::EverythingElse, - }; - let check_email_input = payload.input.to_check_email_input(config); - let check_email_task = CheckEmailTask { - input: check_email_input, - job_id: payload.job_id, - webhook: payload.webhook.clone(), - }; - let check_email_payload = serde_json::to_vec(&check_email_task)?; - - channel - .basic_publish( - "", - format!("{}", queue).as_str(), - BasicPublishOptions::default(), - &check_email_payload, - delivery.properties.clone(), - ) - .await? - .await?; - - delivery.ack(BasicAckOptions::default()).await?; - debug!(target: LOG_TARGET, email=?payload.input.to_email, queue=?queue.to_string(), "Message do_preprocess_worked"); - - Ok(()) -} diff --git a/backend/src/worker/response.rs b/backend/src/worker/response.rs index 90a47a124..0d1d73d4a 100644 --- a/backend/src/worker/response.rs +++ b/backend/src/worker/response.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use super::check_email::{CheckEmailTask, TaskError}; +use super::do_work::{CheckEmailTask, TaskError}; use anyhow::bail; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use lapin::message::Delivery; @@ -37,13 +37,13 @@ use warp::http::StatusCode; pub async fn save_to_db( backend_name: &str, pg_pool: Option, - payload: &CheckEmailTask, + task: &CheckEmailTask, + bulk_job_id: i32, worker_output: &Result, ) -> Result<(), anyhow::Error> { let pg_pool = pg_pool.ok_or_else(|| anyhow::anyhow!("No DB pool provided"))?; - let job_id = payload.job_id.unwrap(); - let payload_json = serde_json::to_value(payload)?; + let payload_json = serde_json::to_value(task)?; match worker_output { Ok(output) => { @@ -56,7 +56,7 @@ pub async fn save_to_db( RETURNING id "#, payload_json, - job_id, + bulk_job_id, backend_name, output_json, ) @@ -71,7 +71,7 @@ pub async fn save_to_db( RETURNING id "#, payload_json, - job_id, + bulk_job_id, backend_name, err.to_string(), ) @@ -80,7 +80,7 @@ pub async fn save_to_db( } } - debug!(target: LOG_TARGET, email=?payload.input.to_email, "Wrote to DB"); + debug!(target: LOG_TARGET, email=?task.input.to_email, "Wrote to DB"); Ok(()) } diff --git a/docker-compose.local.yaml b/docker-compose.local.yaml index 47e5abbcc..d23883553 100644 --- a/docker-compose.local.yaml +++ b/docker-compose.local.yaml @@ -27,39 +27,37 @@ services: - ./backend/postgres_data:/var/lib/postgresql/data restart: always - worker_smtp: + worker1: build: context: . dockerfile: backend/Dockerfile - container_name: worker_smtp + container_name: worker1 ports: - "8080:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_smtp + RCH__BACKEND_NAME: worker1 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db restart: always - worker_headless: + worker2: build: context: . dockerfile: backend/Dockerfile - container_name: worker_headless + container_name: worker2 ports: - "8081:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_headless + RCH__BACKEND_NAME: worker2 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db diff --git a/docker-compose.yaml b/docker-compose.yaml index fe207e6ca..80ab8444c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -23,40 +23,36 @@ services: POSTGRES_DB: reacher_db restart: always - worker_smtp: + worker1: image: reacherhq/backend:beta - container_name: worker_smtp + container_name: worker1 ports: - "8080:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_smtp + RCH__BACKEND_NAME: worker1 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 - RCH__WORKER__RABBITMQ__QUEUES: check.gmail,check.hotmailb2b,check.everything_else RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_DAY: 10000 # Recommended limit per IP per day for SMTP requests restart: always - worker_headless: + worker2: image: reacherhq/backend:beta - container_name: worker_headless + container_name: worker2 ports: - "8081:8080" depends_on: - postgres - rabbitmq environment: - RCH__BACKEND_NAME: worker_headless + RCH__BACKEND_NAME: worker2 RUST_LOG: reacher=info - RCH__HTTP_HOST: 0.0.0.0 RCH__WORKER__ENABLE: true RCH__WORKER__RABBITMQ__URL: amqp://guest:guest@rabbitmq:5672 - RCH__WORKER__RABBITMQ__QUEUES: check.hotmailb2c,check.yahoo RCH__WORKER__POSTGRES__DB_URL: postgres://postgres:postgres@postgres:5432/reacher_db RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_MINUTE: 100 # Recommended limit for headless verifications restart: always