Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(backend): Remove "preprocess" queue #1539

Merged
merged 3 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions backend/backend_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,10 @@ enable = true
# Env variable: RCH__WORKER__RABBITMQ__URL
url = "amqp://guest:guest@localhost:5672"

# Queues to consume emails from. By default, the worker consumes from all
# queues.
#
# To consume from only a subset of queues, uncomment the line `queues = "all"`
# and specify the queues you want to consume from.
#
# Below is the exhaustive list of queue names that the worker can consume from:
# - "check.gmail": subscribe exclusively to Gmail emails.
# - "check.hotmailb2b": subscribe exclusively to Hotmail B2B emails.
# - "check.hotmailb2c": subscribe exclusively to Hotmail B2C emails.
# - "check.yahoo": subscribe exclusively to Yahoo emails.
# - "check.everything_else": subscribe to all emails that are not Gmail, Yahoo, or Hotmail.
#
# Env variable: RCH__WORKER__RABBITMQ__QUEUES
#
# queues = ["check.gmail", "check.hotmailb2b", "check.hotmailb2c", "check.yahoo", "check.everything_else"]
queues = "all"

# Number of concurrent emails to verify for this worker across all queues.
# Number of concurrent emails to verify for this worker.
#
# Env variable: RCH__WORKER__RABBITMQ__CONCURRENCY
concurrency = 20
concurrency = 5

# Throttle the maximum number of requests per second, per minute, per hour, and
# per day for this worker.
Expand Down
227 changes: 17 additions & 210 deletions backend/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::create_db;
#[cfg(feature = "worker")]
use crate::worker::check_email::TaskWebhook;
use crate::worker::do_work::TaskWebhook;
#[cfg(feature = "worker")]
use crate::worker::setup_rabbit_mq;
use anyhow::bail;
Expand All @@ -27,12 +27,11 @@ use check_if_email_exists::{
use config::Config;
#[cfg(feature = "worker")]
use lapin::Channel;
use serde::de::{self, Deserializer, Visitor};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::env;
#[cfg(feature = "worker")]
use std::sync::Arc;
use std::{env, fmt};
use tracing::warn;

#[derive(Debug, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -70,10 +69,7 @@ pub struct BackendConfig {
pg_pool: Option<PgPool>,
#[cfg(feature = "worker")]
#[serde(skip)]
check_email_channel: Option<Arc<Channel>>,
#[cfg(feature = "worker")]
#[serde(skip)]
preprocess_channel: Option<Arc<Channel>>,
channel: Option<Arc<Channel>>,
}

impl BackendConfig {
Expand All @@ -90,9 +86,7 @@ impl BackendConfig {
&self.worker.postgres,
&self.pg_pool,
#[cfg(feature = "worker")]
&self.check_email_channel,
#[cfg(feature = "worker")]
&self.preprocess_channel,
&self.channel,
) {
#[cfg(feature = "worker")]
(
Expand All @@ -101,29 +95,25 @@ impl BackendConfig {
Some(rabbitmq),
Some(postgres),
Some(pg_pool),
Some(check_email_channel),
Some(preprocess_channel),
Some(channel),
) => Ok(MustWorkerConfig {
pg_pool: pg_pool.clone(),
#[cfg(feature = "worker")]
check_email_channel: check_email_channel.clone(),
#[cfg(feature = "worker")]
preprocess_channel: preprocess_channel.clone(),
channel: channel.clone(),
throttle: throttle.clone(),
rabbitmq: rabbitmq.clone(),
#[cfg(feature = "worker")]
webhook: self.worker.webhook.clone(),
postgres: postgres.clone(),
}),
#[cfg(feature = "worker")]
(true, _, _, _, _, _, _) => bail!("Worker configuration is missing"),
(true, _, _, _, _, _) => bail!("Worker configuration is missing"),
_ => bail!("Calling must_worker_config on a non-worker backend"),
}
}

/// Attempt connection to the Postgres database and RabbitMQ. Also populates
/// the internal `pg_pool1, `check_email_channel` and `preprocess_channel`
/// fields with the connections.
/// the internal `pg_pool` and `channel` fields with the connections.
pub async fn connect(&mut self) -> Result<(), anyhow::Error> {
let pg_pool = if self.worker.enable {
let db_url = self
Expand All @@ -145,21 +135,16 @@ impl BackendConfig {

#[cfg(feature = "worker")]
{
let (check_email_channel, preprocess_channel) = if self.worker.enable {
let channel = if self.worker.enable {
let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| {
anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration")
})?;
let (check_email_channel, preprocess_channel) =
setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?;
(
Some(Arc::new(check_email_channel)),
Some(Arc::new(preprocess_channel)),
)
let channel = setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?;
Some(Arc::new(channel))
} else {
(None, None)
None
};
self.check_email_channel = check_email_channel;
self.preprocess_channel = preprocess_channel;
self.channel = channel;
}

Ok(())
Expand All @@ -168,16 +153,6 @@ impl BackendConfig {
pub fn get_pg_pool(&self) -> Option<PgPool> {
self.pg_pool.clone()
}

#[cfg(feature = "worker")]
pub fn get_check_email_channel(&self) -> Option<Arc<Channel>> {
self.check_email_channel.clone()
}

#[cfg(feature = "worker")]
pub fn get_preprocess_channel(&self) -> Option<Arc<Channel>> {
self.preprocess_channel.clone()
}
}

#[derive(Debug, Default, Deserialize, Clone, Serialize)]
Expand Down Expand Up @@ -213,9 +188,7 @@ pub struct WorkerConfig {
pub struct MustWorkerConfig {
pub pg_pool: PgPool,
#[cfg(feature = "worker")]
pub check_email_channel: Arc<Channel>,
#[cfg(feature = "worker")]
pub preprocess_channel: Arc<Channel>,
pub channel: Arc<Channel>,

pub throttle: ThrottleConfig,
pub rabbitmq: RabbitMQConfig,
Expand All @@ -224,162 +197,13 @@ pub struct MustWorkerConfig {
pub postgres: PostgresConfig,
}

#[derive(Debug, Clone, Serialize)]
pub enum RabbitMQQueues {
All,
Only(Vec<Queue>),
}

/// Deserialize RabbitMQQueues from a string or a list of strings.
/// If the value is "all", then we return RabbitMQQueues::All.
/// If the value is a list of strings, then we return RabbitMQQueues::Only.
impl<'de> Deserialize<'de> for RabbitMQQueues {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct RabbitMQQueuesVisitor;

impl<'de> Visitor<'de> for RabbitMQQueuesVisitor {
type Value = RabbitMQQueues;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string 'all' or a list of queue strings")
}

fn visit_str<E>(self, value: &str) -> Result<RabbitMQQueues, E>
where
E: de::Error,
{
if value == "all" {
Ok(RabbitMQQueues::All)
} else {
Err(de::Error::unknown_variant(value, &["all"]))
}
}

fn visit_seq<A>(self, mut seq: A) -> Result<RabbitMQQueues, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut queues = Vec::new();
while let Some(value) = seq.next_element()? {
queues.push(value);
}
Ok(RabbitMQQueues::Only(queues))
}
}

deserializer.deserialize_any(RabbitMQQueuesVisitor)
}
}

impl RabbitMQQueues {
pub fn to_queues(&self) -> Vec<Queue> {
match self {
RabbitMQQueues::All => vec![
Queue::Gmail,
Queue::HotmailB2B,
Queue::HotmailB2C,
Queue::Yahoo,
Queue::EverythingElse,
],
RabbitMQQueues::Only(queues) => queues.clone(),
}
}
}

#[derive(Debug, Deserialize, Clone, Serialize)]
pub struct RabbitMQConfig {
pub url: String,
/// Queues to consume emails from. By default the worker consumes from all
/// queues.
///
/// If you wish to consume from only a subset of queues, you can uncomment
/// the line `queues = "all"`, and then specify the queues you want to
/// consume from.
///
/// Below is the exhaustive list of queue names that the worker can consume from:
/// - "check.gmail": subcribe exclusively to Gmail emails.
/// - "check.hotmailb2b": subcribe exclusively to Hotmail B2B emails.
/// - "check.hotmailb2c": subcribe exclusively to Hotmail B2C emails.
/// - "check.yahoo": subcribe exclusively to Yahoo emails.
/// - "check.everything_else": subcribe to all emails that are not Gmail, Yahoo, or Hotmail.
///
/// queues = ["check.gmail", "check.hotmailb2b", "check.hotmailb2c", "check.yahoo", "check.everything_else"]
pub queues: RabbitMQQueues,
/// Total number of concurrent messages that the worker can process, across
/// all queues.
/// Total number of concurrent messages that the worker can process.
pub concurrency: u16,
}

/// Queue names that the worker can consume from. Each email is routed to a
/// one and only one queue, based on the email provider. A single worker can
/// consume from multiple queues.
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum Queue {
Gmail,
HotmailB2B,
HotmailB2C,
Yahoo,
EverythingElse,
}

impl fmt::Display for Queue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Queue::Gmail => write!(f, "check.gmail"),
Queue::HotmailB2B => write!(f, "check.hotmailb2b"),
Queue::HotmailB2C => write!(f, "check.hotmailb2c"),
Queue::Yahoo => write!(f, "check.yahoo"),
Queue::EverythingElse => write!(f, "check.everything_else"),
}
}
}

// Implement Deserialize for the Queue enum
impl<'de> Deserialize<'de> for Queue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct QueueVisitor;

impl<'de> Visitor<'de> for QueueVisitor {
type Value = Queue;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid queue string")
}

fn visit_str<E>(self, value: &str) -> Result<Queue, E>
where
E: de::Error,
{
match value {
"check.gmail" => Ok(Queue::Gmail),
"check.hotmailb2b" => Ok(Queue::HotmailB2B),
"check.hotmailb2c" => Ok(Queue::HotmailB2C),
"check.yahoo" => Ok(Queue::Yahoo),
"check.everything_else" => Ok(Queue::EverythingElse),
_ => Err(de::Error::unknown_variant(
value,
&[
"check.gmail",
"check.hotmailb2b",
"check.hotmailb2c",
"check.yahoo",
"check.everything_else",
],
)),
}
}
}

deserializer.deserialize_str(QueueVisitor)
}
}

#[derive(Debug, Deserialize, Clone, Serialize)]
pub struct PostgresConfig {
pub db_url: String,
Expand Down Expand Up @@ -408,19 +232,10 @@ impl ThrottleConfig {
/// Load the worker configuration from the worker_config.toml file and from the
/// environment.
pub async fn load_config() -> Result<BackendConfig, anyhow::Error> {
let mut cfg = Config::builder()
let cfg = Config::builder()
.add_source(config::File::with_name("backend_config"))
.add_source(config::Environment::with_prefix("RCH").separator("__"));

// The RCH__WORKER__RABBITMQ__QUEUES is always read as a str, whereas we
// sometimes want to parse it as a list of strings, if it's not "all". We
// handle this case separately.
if let Ok(queues) = env::var("RCH__WORKER__RABBITMQ__QUEUES") {
if queues != "all" {
let queues: Vec<String> = queues.split(',').map(String::from).collect();
cfg = cfg.set_override("worker.rabbitmq.queues", queues)?;
}
}
let cfg = cfg.build()?.try_deserialize::<BackendConfig>()?;

if !cfg.worker.enable && (cfg.worker.rabbitmq.is_some() || cfg.worker.throttle.is_some()) {
Expand All @@ -432,21 +247,13 @@ pub async fn load_config() -> Result<BackendConfig, anyhow::Error> {

#[cfg(test)]
mod tests {
use super::{load_config, Queue};
use super::load_config;
use std::env;

#[tokio::test]
async fn test_env_vars() {
env::set_var("RCH__BACKEND_NAME", "test-backend");
env::set_var(
"RCH__WORKER__RABBITMQ__QUEUES",
"check.gmail,check.hotmailb2b",
);
let cfg = load_config().await.unwrap();
assert_eq!(cfg.backend_name, "test-backend");
assert_eq!(
cfg.worker.rabbitmq.unwrap().queues.to_queues(),
vec![Queue::Gmail, Queue::HotmailB2B]
);
}
}
6 changes: 6 additions & 0 deletions backend/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ impl From<warp::http::status::InvalidStatusCode> for ReacherResponseError {
}
}

impl From<anyhow::Error> for ReacherResponseError {
fn from(e: anyhow::Error) -> Self {
ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}

/// This function receives a `Rejection` and tries to return a custom value,
/// otherwise simply passes the rejection along.
pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection> {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/http/v0/check_email/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::config::BackendConfig;
use crate::http::{check_header, ReacherResponseError};

/// The request body for the `POST /v0/check_email` endpoint.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct CheckEmailRequest {
pub to_email: String,
pub from_email: Option<String>,
Expand Down
Loading
Loading