Skip to content

Commit

Permalink
refactor(backend): Remove "preprocess" queue (#1539)
Browse files Browse the repository at this point in the history
* Remove preprocess

* Fix build

* remove unused
  • Loading branch information
amaury1093 authored Nov 30, 2024
1 parent ed19166 commit 34b1db8
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 592 deletions.
22 changes: 2 additions & 20 deletions backend/backend_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,10 @@ enable = true
# Env variable: RCH__WORKER__RABBITMQ__URL
url = "amqp://guest:guest@localhost:5672"

# Queues to consume emails from. By default, the worker consumes from all
# queues.
#
# To consume from only a subset of queues, uncomment the line `queues = "all"`
# and specify the queues you want to consume from.
#
# Below is the exhaustive list of queue names that the worker can consume from:
# - "check.gmail": subscribe exclusively to Gmail emails.
# - "check.hotmailb2b": subscribe exclusively to Hotmail B2B emails.
# - "check.hotmailb2c": subscribe exclusively to Hotmail B2C emails.
# - "check.yahoo": subscribe exclusively to Yahoo emails.
# - "check.everything_else": subscribe to all emails that are not Gmail, Yahoo, or Hotmail.
#
# Env variable: RCH__WORKER__RABBITMQ__QUEUES
#
# queues = ["check.gmail", "check.hotmailb2b", "check.hotmailb2c", "check.yahoo", "check.everything_else"]
queues = "all"

# Number of concurrent emails to verify for this worker across all queues.
# Number of concurrent emails to verify for this worker.
#
# Env variable: RCH__WORKER__RABBITMQ__CONCURRENCY
concurrency = 20
concurrency = 5

# Throttle the maximum number of requests per second, per minute, per hour, and
# per day for this worker.
Expand Down
227 changes: 17 additions & 210 deletions backend/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::create_db;
#[cfg(feature = "worker")]
use crate::worker::check_email::TaskWebhook;
use crate::worker::do_work::TaskWebhook;
#[cfg(feature = "worker")]
use crate::worker::setup_rabbit_mq;
use anyhow::bail;
Expand All @@ -27,12 +27,11 @@ use check_if_email_exists::{
use config::Config;
#[cfg(feature = "worker")]
use lapin::Channel;
use serde::de::{self, Deserializer, Visitor};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::env;
#[cfg(feature = "worker")]
use std::sync::Arc;
use std::{env, fmt};
use tracing::warn;

#[derive(Debug, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -70,10 +69,7 @@ pub struct BackendConfig {
pg_pool: Option<PgPool>,
#[cfg(feature = "worker")]
#[serde(skip)]
check_email_channel: Option<Arc<Channel>>,
#[cfg(feature = "worker")]
#[serde(skip)]
preprocess_channel: Option<Arc<Channel>>,
channel: Option<Arc<Channel>>,
}

impl BackendConfig {
Expand All @@ -90,9 +86,7 @@ impl BackendConfig {
&self.worker.postgres,
&self.pg_pool,
#[cfg(feature = "worker")]
&self.check_email_channel,
#[cfg(feature = "worker")]
&self.preprocess_channel,
&self.channel,
) {
#[cfg(feature = "worker")]
(
Expand All @@ -101,29 +95,25 @@ impl BackendConfig {
Some(rabbitmq),
Some(postgres),
Some(pg_pool),
Some(check_email_channel),
Some(preprocess_channel),
Some(channel),
) => Ok(MustWorkerConfig {
pg_pool: pg_pool.clone(),
#[cfg(feature = "worker")]
check_email_channel: check_email_channel.clone(),
#[cfg(feature = "worker")]
preprocess_channel: preprocess_channel.clone(),
channel: channel.clone(),
throttle: throttle.clone(),
rabbitmq: rabbitmq.clone(),
#[cfg(feature = "worker")]
webhook: self.worker.webhook.clone(),
postgres: postgres.clone(),
}),
#[cfg(feature = "worker")]
(true, _, _, _, _, _, _) => bail!("Worker configuration is missing"),
(true, _, _, _, _, _) => bail!("Worker configuration is missing"),
_ => bail!("Calling must_worker_config on a non-worker backend"),
}
}

/// Attempt connection to the Postgres database and RabbitMQ. Also populates
/// the internal `pg_pool1, `check_email_channel` and `preprocess_channel`
/// fields with the connections.
/// the internal `pg_pool` and `channel` fields with the connections.
pub async fn connect(&mut self) -> Result<(), anyhow::Error> {
let pg_pool = if self.worker.enable {
let db_url = self
Expand All @@ -145,21 +135,16 @@ impl BackendConfig {

#[cfg(feature = "worker")]
{
let (check_email_channel, preprocess_channel) = if self.worker.enable {
let channel = if self.worker.enable {
let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| {
anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration")
})?;
let (check_email_channel, preprocess_channel) =
setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?;
(
Some(Arc::new(check_email_channel)),
Some(Arc::new(preprocess_channel)),
)
let channel = setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?;
Some(Arc::new(channel))
} else {
(None, None)
None
};
self.check_email_channel = check_email_channel;
self.preprocess_channel = preprocess_channel;
self.channel = channel;
}

Ok(())
Expand All @@ -168,16 +153,6 @@ impl BackendConfig {
pub fn get_pg_pool(&self) -> Option<PgPool> {
self.pg_pool.clone()
}

#[cfg(feature = "worker")]
pub fn get_check_email_channel(&self) -> Option<Arc<Channel>> {
self.check_email_channel.clone()
}

#[cfg(feature = "worker")]
pub fn get_preprocess_channel(&self) -> Option<Arc<Channel>> {
self.preprocess_channel.clone()
}
}

#[derive(Debug, Default, Deserialize, Clone, Serialize)]
Expand Down Expand Up @@ -213,9 +188,7 @@ pub struct WorkerConfig {
pub struct MustWorkerConfig {
pub pg_pool: PgPool,
#[cfg(feature = "worker")]
pub check_email_channel: Arc<Channel>,
#[cfg(feature = "worker")]
pub preprocess_channel: Arc<Channel>,
pub channel: Arc<Channel>,

pub throttle: ThrottleConfig,
pub rabbitmq: RabbitMQConfig,
Expand All @@ -224,162 +197,13 @@ pub struct MustWorkerConfig {
pub postgres: PostgresConfig,
}

#[derive(Debug, Clone, Serialize)]
pub enum RabbitMQQueues {
All,
Only(Vec<Queue>),
}

/// Deserialize RabbitMQQueues from a string or a list of strings.
/// If the value is "all", then we return RabbitMQQueues::All.
/// If the value is a list of strings, then we return RabbitMQQueues::Only.
impl<'de> Deserialize<'de> for RabbitMQQueues {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct RabbitMQQueuesVisitor;

impl<'de> Visitor<'de> for RabbitMQQueuesVisitor {
type Value = RabbitMQQueues;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string 'all' or a list of queue strings")
}

fn visit_str<E>(self, value: &str) -> Result<RabbitMQQueues, E>
where
E: de::Error,
{
if value == "all" {
Ok(RabbitMQQueues::All)
} else {
Err(de::Error::unknown_variant(value, &["all"]))
}
}

fn visit_seq<A>(self, mut seq: A) -> Result<RabbitMQQueues, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut queues = Vec::new();
while let Some(value) = seq.next_element()? {
queues.push(value);
}
Ok(RabbitMQQueues::Only(queues))
}
}

deserializer.deserialize_any(RabbitMQQueuesVisitor)
}
}

impl RabbitMQQueues {
pub fn to_queues(&self) -> Vec<Queue> {
match self {
RabbitMQQueues::All => vec![
Queue::Gmail,
Queue::HotmailB2B,
Queue::HotmailB2C,
Queue::Yahoo,
Queue::EverythingElse,
],
RabbitMQQueues::Only(queues) => queues.clone(),
}
}
}

#[derive(Debug, Deserialize, Clone, Serialize)]
pub struct RabbitMQConfig {
pub url: String,
/// Queues to consume emails from. By default the worker consumes from all
/// queues.
///
/// If you wish to consume from only a subset of queues, you can uncomment
/// the line `queues = "all"`, and then specify the queues you want to
/// consume from.
///
/// Below is the exhaustive list of queue names that the worker can consume from:
/// - "check.gmail": subcribe exclusively to Gmail emails.
/// - "check.hotmailb2b": subcribe exclusively to Hotmail B2B emails.
/// - "check.hotmailb2c": subcribe exclusively to Hotmail B2C emails.
/// - "check.yahoo": subcribe exclusively to Yahoo emails.
/// - "check.everything_else": subcribe to all emails that are not Gmail, Yahoo, or Hotmail.
///
/// queues = ["check.gmail", "check.hotmailb2b", "check.hotmailb2c", "check.yahoo", "check.everything_else"]
pub queues: RabbitMQQueues,
/// Total number of concurrent messages that the worker can process, across
/// all queues.
/// Total number of concurrent messages that the worker can process.
pub concurrency: u16,
}

/// Queue names that the worker can consume from. Each email is routed to a
/// one and only one queue, based on the email provider. A single worker can
/// consume from multiple queues.
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum Queue {
Gmail,
HotmailB2B,
HotmailB2C,
Yahoo,
EverythingElse,
}

impl fmt::Display for Queue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Queue::Gmail => write!(f, "check.gmail"),
Queue::HotmailB2B => write!(f, "check.hotmailb2b"),
Queue::HotmailB2C => write!(f, "check.hotmailb2c"),
Queue::Yahoo => write!(f, "check.yahoo"),
Queue::EverythingElse => write!(f, "check.everything_else"),
}
}
}

// Implement Deserialize for the Queue enum
impl<'de> Deserialize<'de> for Queue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct QueueVisitor;

impl<'de> Visitor<'de> for QueueVisitor {
type Value = Queue;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid queue string")
}

fn visit_str<E>(self, value: &str) -> Result<Queue, E>
where
E: de::Error,
{
match value {
"check.gmail" => Ok(Queue::Gmail),
"check.hotmailb2b" => Ok(Queue::HotmailB2B),
"check.hotmailb2c" => Ok(Queue::HotmailB2C),
"check.yahoo" => Ok(Queue::Yahoo),
"check.everything_else" => Ok(Queue::EverythingElse),
_ => Err(de::Error::unknown_variant(
value,
&[
"check.gmail",
"check.hotmailb2b",
"check.hotmailb2c",
"check.yahoo",
"check.everything_else",
],
)),
}
}
}

deserializer.deserialize_str(QueueVisitor)
}
}

#[derive(Debug, Deserialize, Clone, Serialize)]
pub struct PostgresConfig {
pub db_url: String,
Expand Down Expand Up @@ -408,19 +232,10 @@ impl ThrottleConfig {
/// Load the worker configuration from the worker_config.toml file and from the
/// environment.
pub async fn load_config() -> Result<BackendConfig, anyhow::Error> {
let mut cfg = Config::builder()
let cfg = Config::builder()
.add_source(config::File::with_name("backend_config"))
.add_source(config::Environment::with_prefix("RCH").separator("__"));

// The RCH__WORKER__RABBITMQ__QUEUES is always read as a str, whereas we
// sometimes want to parse it as a list of strings, if it's not "all". We
// handle this case separately.
if let Ok(queues) = env::var("RCH__WORKER__RABBITMQ__QUEUES") {
if queues != "all" {
let queues: Vec<String> = queues.split(',').map(String::from).collect();
cfg = cfg.set_override("worker.rabbitmq.queues", queues)?;
}
}
let cfg = cfg.build()?.try_deserialize::<BackendConfig>()?;

if !cfg.worker.enable && (cfg.worker.rabbitmq.is_some() || cfg.worker.throttle.is_some()) {
Expand All @@ -432,21 +247,13 @@ pub async fn load_config() -> Result<BackendConfig, anyhow::Error> {

#[cfg(test)]
mod tests {
use super::{load_config, Queue};
use super::load_config;
use std::env;

#[tokio::test]
async fn test_env_vars() {
env::set_var("RCH__BACKEND_NAME", "test-backend");
env::set_var(
"RCH__WORKER__RABBITMQ__QUEUES",
"check.gmail,check.hotmailb2b",
);
let cfg = load_config().await.unwrap();
assert_eq!(cfg.backend_name, "test-backend");
assert_eq!(
cfg.worker.rabbitmq.unwrap().queues.to_queues(),
vec![Queue::Gmail, Queue::HotmailB2B]
);
}
}
6 changes: 6 additions & 0 deletions backend/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ impl From<warp::http::status::InvalidStatusCode> for ReacherResponseError {
}
}

impl From<anyhow::Error> for ReacherResponseError {
fn from(e: anyhow::Error) -> Self {
ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}

/// This function receives a `Rejection` and tries to return a custom value,
/// otherwise simply passes the rejection along.
pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection> {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/http/v0/check_email/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::config::BackendConfig;
use crate::http::{check_header, ReacherResponseError};

/// The request body for the `POST /v0/check_email` endpoint.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct CheckEmailRequest {
pub to_email: String,
pub from_email: Option<String>,
Expand Down
Loading

0 comments on commit 34b1db8

Please sign in to comment.