Skip to content

Commit

Permalink
refactor: Add throttle as a global config (#1547)
Browse files Browse the repository at this point in the history
* cursor v1

* fix default

* cleanup

* add docs for worker

* fix bulk and worker

* add logs

* better throttle logs

* remove warn

* better errors

* better error displau

* fmt

* clippy

* fix test
  • Loading branch information
amaury1093 authored Dec 14, 2024
1 parent 5e2eeb3 commit a4efecb
Show file tree
Hide file tree
Showing 18 changed files with 511 additions and 239 deletions.
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Run the backend without worker mode, i.e. only enabling single-shot
# verifications via the /v1/check_email endpoint.
.PHONY: run
run:
cd backend && cargo run --bin reacher_backend


# Run the backend with worker mode on. This enables the /v1/bulk endpoints.
# Make sure to have a Postgres DB and a RabbitMQ instance running.
.PHONY: run-with-worker
run-with-worker: export RCH__WORKER__ENABLE=true
run-with-worker: export RCH__WORKER__RABBITMQ__URL=amqp://guest:guest@localhost:5672
run-with-worker: export RCH__STORAGE__POSTGRES__DB_URL=postgresql://localhost/reacherdb
run-with-worker: run

.PHONY: run-with-commercial-license-trial
run-with-commercial-license-trial: export RCH__COMMERCIAL_LICENSE_TRIAL__URL=http://localhost:3000/api/v1/commercial_license_trial
run-with-commercial-license-trial: run
56 changes: 36 additions & 20 deletions backend/backend_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,47 @@ hotmailb2c = "headless"
# recommended.
yahoo = "headless"

# Throttle the maximum number of requests per second, per minute, per hour, and
# per day for this worker.
# All fields are optional; comment them out to disable the limit.
#
# We however recommend setting the throttle for at least the per-minute and
# per-day limits to prevent the IPs from being blocked by the email providers.
# The default values are set to 60 requests per minute and 10,000 requests per
# day.
#
# Important: these throttle configurations only apply to /v1/* endpoints, and
# not to the previous /v0/check_email endpoint. The latter endpoint always
# executes the verification immediately, regardless of the throttle settings.
#
# Env variables:
# - RCH__THROTTLE__MAX_REQUESTS_PER_SECOND
# - RCH__THROTTLE__MAX_REQUESTS_PER_MINUTE
# - RCH__THROTTLE__MAX_REQUESTS_PER_HOUR
# - RCH__THROTTLE__MAX_REQUESTS_PER_DAY
[throttle]
# max_requests_per_second = 20
max_requests_per_minute = 60
# max_requests_per_hour = 1000
max_requests_per_day = 10000

# Configuration for a queue-based architecture for Reacher. This feature is
# currently in **beta**. The queue-based architecture allows Reacher to scale
# horizontally by running multiple workers that consume emails from a RabbitMQ
# queue.
#
# To enable the queue-based architecture, set the "enable" field to "true" and
# configure the RabbitMQ connection below. The "concurrency" field specifies
# the number of concurrent emails to verify for this worker.
#
# For more information, see the documentation at:
# https://docs.reacher.email/self-hosting/scaling-for-production
[worker]
# Enable the worker to consume emails from the RabbitMQ queues. If set, the
# RabbitMQ configuration below must be set as well.
#
# Env variable: RCH__WORKER__ENABLE
enable = true
enable = false

# RabbitMQ configuration.
[worker.rabbitmq]
Expand All @@ -105,25 +140,6 @@ url = "amqp://guest:guest@localhost:5672"
# Env variable: RCH__WORKER__RABBITMQ__CONCURRENCY
concurrency = 5

# Throttle the maximum number of requests per second, per minute, per hour, and
# per day for this worker.
# All fields are optional; comment them out to disable the limit.
#
# Important: these throttle configurations only apply to /v1/* endpoints, and
# not to the previous /v0/check_email endpoint. The latter endpoint always
# executes the verification immediately, regardless of the throttle settings.
#
# Env variables:
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_SECOND
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_MINUTE
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_HOUR
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_DAY
[worker.throttle]
# max_requests_per_second = 20
# max_requests_per_minute = 100
# max_requests_per_hour = 1000
# max_requests_per_day = 20000

# Below are the configurations for the storage of the email verification
# results. We currently support the following storage backends:
# - Postgres
Expand Down
4 changes: 2 additions & 2 deletions backend/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@
"duration": {
"$ref": "#/components/schemas/Duration"
},
"server_name": {
"backend_name": {
"type": "string",
"x-stoplight": {
"id": "2jrbdecvqh4t5"
Expand All @@ -717,7 +717,7 @@
"start_time",
"end_time",
"duration",
"server_name",
"backend_name",
"smtp"
]
},
Expand Down
71 changes: 52 additions & 19 deletions backend/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::storage::{postgres::PostgresStorage, StorageAdapter};
use crate::throttle::ThrottleManager;
use crate::worker::do_work::TaskWebhook;
use crate::worker::setup_rabbit_mq;
use anyhow::{bail, Context};
Expand All @@ -29,7 +30,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use tracing::warn;

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct BackendConfig {
/// Name of the backend.
pub backend_name: String,
Expand Down Expand Up @@ -65,36 +66,61 @@ pub struct BackendConfig {
/// Whether to enable the Commercial License Trial. Setting this to true
pub commercial_license_trial: Option<CommercialLicenseTrialConfig>,

/// Throttle configuration for all requests
pub throttle: ThrottleConfig,

// Internal fields, not part of the configuration.
#[serde(skip)]
channel: Option<Arc<Channel>>,

#[serde(skip)]
storage_adapter: Arc<StorageAdapter>,

#[serde(skip)]
throttle_manager: Arc<ThrottleManager>,
}

impl BackendConfig {
/// Create an empty BackendConfig. This is useful for testing purposes.
pub fn empty() -> Self {
Self {
backend_name: "".to_string(),
from_email: "".to_string(),
hello_name: "".to_string(),
webdriver_addr: "".to_string(),
proxy: None,
verif_method: VerifMethodConfig::default(),
http_host: "127.0.0.1".to_string(),
http_port: 8080,
header_secret: None,
smtp_timeout: None,
sentry_dsn: None,
worker: WorkerConfig::default(),
storage: Some(StorageConfig::Noop),
commercial_license_trial: None,
throttle: ThrottleConfig::new_without_throttle(),
channel: None,
storage_adapter: Arc::new(StorageAdapter::Noop),
throttle_manager: Arc::new(
ThrottleManager::new(ThrottleConfig::new_without_throttle()),
),
}
}

/// Get the worker configuration.
///
/// # Panics
///
/// Panics if the worker configuration is missing.
pub fn must_worker_config(&self) -> Result<MustWorkerConfig, anyhow::Error> {
match (
self.worker.enable,
&self.worker.throttle,
&self.worker.rabbitmq,
&self.channel,
) {
(true, Some(throttle), Some(rabbitmq), Some(channel)) => Ok(MustWorkerConfig {
match (self.worker.enable, &self.worker.rabbitmq, &self.channel) {
(true, Some(rabbitmq), Some(channel)) => Ok(MustWorkerConfig {
channel: channel.clone(),
throttle: throttle.clone(),
rabbitmq: rabbitmq.clone(),

webhook: self.worker.webhook.clone(),
}),

(true, _, _, _) => bail!("Worker configuration is missing"),
(true, _, _) => bail!("Worker configuration is missing"),
_ => bail!("Calling must_worker_config on a non-worker backend"),
}
}
Expand Down Expand Up @@ -126,6 +152,9 @@ impl BackendConfig {
};
self.channel = channel;

// Initialize throttle manager
self.throttle_manager = Arc::new(ThrottleManager::new(self.throttle.clone()));

Ok(())
}

Expand All @@ -142,6 +171,10 @@ impl BackendConfig {
StorageAdapter::Noop => None,
}
}

pub fn get_throttle_manager(&self) -> Arc<ThrottleManager> {
self.throttle_manager.clone()
}
}

#[derive(Debug, Default, Deserialize, Clone, Serialize)]
Expand All @@ -159,9 +192,6 @@ pub struct VerifMethodConfig {
#[derive(Debug, Default, Deserialize, Clone, Serialize)]
pub struct WorkerConfig {
pub enable: bool,

/// Throttle configuration for the worker.
pub throttle: Option<ThrottleConfig>,
pub rabbitmq: Option<RabbitMQConfig>,
/// Optional webhook configuration to send email verification results.
pub webhook: Option<TaskWebhook>,
Expand All @@ -172,8 +202,6 @@ pub struct WorkerConfig {
#[derive(Debug, Clone)]
pub struct MustWorkerConfig {
pub channel: Arc<Channel>,

pub throttle: ThrottleConfig,
pub rabbitmq: RabbitMQConfig,
pub webhook: Option<TaskWebhook>,
}
Expand All @@ -185,7 +213,7 @@ pub struct RabbitMQConfig {
pub concurrency: u16,
}

#[derive(Debug, Deserialize, Clone, Serialize)]
#[derive(Debug, Default, Deserialize, Clone, Serialize)]
pub struct ThrottleConfig {
pub max_requests_per_second: Option<u32>,
pub max_requests_per_minute: Option<u32>,
Expand Down Expand Up @@ -236,8 +264,13 @@ pub async fn load_config() -> Result<BackendConfig, anyhow::Error> {

let cfg = cfg.build()?.try_deserialize::<BackendConfig>()?;

if !cfg.worker.enable && (cfg.worker.rabbitmq.is_some() || cfg.worker.throttle.is_some()) {
warn!(target: LOG_TARGET, "worker.enable is set to false, ignoring throttling and concurrency settings.")
if cfg.worker.enable {
warn!(target: LOG_TARGET, "The worker feature is currently in beta. Please send any feedback to amaury@reacher.email.");

match &cfg.storage {
Some(StorageConfig::Postgres(_)) => {}
_ => bail!("When worker mode is enabled, a Postgres database must be configured."),
}
}

Ok(cfg)
Expand Down
11 changes: 9 additions & 2 deletions backend/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait DisplayDebug: fmt::Display + Debug + Sync + Send {}
impl<T: fmt::Display + Debug + Sync + Send> DisplayDebug for T {}

/// Struct describing an error response.
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub struct ReacherResponseError {
pub code: StatusCode,
pub error: Box<dyn DisplayDebug>,
Expand Down Expand Up @@ -121,7 +121,14 @@ impl From<StorageError> for ReacherResponseError {

impl From<reqwest::Error> for ReacherResponseError {
fn from(e: reqwest::Error) -> Self {
ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e)
ReacherResponseError::new(
e.status()
.map(|s| s.as_u16())
.map(StatusCode::from_u16)
.and_then(Result::ok)
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
e,
)
}
}

Expand Down
20 changes: 0 additions & 20 deletions backend/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ use crate::config::BackendConfig;
use check_if_email_exists::LOG_TARGET;
use error::handle_rejection;
pub use error::ReacherResponseError;
use sqlx::PgPool;
use sqlxmq::JobRunnerHandle;
use std::env;
use std::net::IpAddr;
use std::sync::Arc;
use tracing::info;
pub use v0::check_email::post::CheckEmailRequest;
use warp::http::StatusCode;
use warp::Filter;

pub fn create_routes(
Expand Down Expand Up @@ -101,24 +99,6 @@ pub async fn run_warp_server(
Ok(runner)
}

/// Warp filter to add the database pool to the handler. If the pool is not
/// configured, it will return an error.
pub fn with_db(
pg_pool: Option<PgPool>,
) -> impl Filter<Extract = (PgPool,), Error = warp::Rejection> + Clone {
warp::any().and_then(move || {
let pool = pg_pool.clone();
async move {
pool.ok_or_else(|| {
warp::reject::custom(ReacherResponseError::new(
StatusCode::SERVICE_UNAVAILABLE,
"Please configure a Postgres database on Reacher before calling this endpoint",
))
})
}
})
}

/// The header which holds the Reacher backend secret.
pub const REACHER_SECRET_HEADER: &str = "x-reacher-secret";

Expand Down
5 changes: 3 additions & 2 deletions backend/src/http/v1/bulk/get_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use sqlx::PgPool;
use warp::http::StatusCode;
use warp::Filter;

use super::with_worker_db;
use crate::config::BackendConfig;
use crate::http::{with_db, ReacherResponseError};
use crate::http::ReacherResponseError;

/// NOTE: Type conversions from postgres to rust types
/// are according to the table given by
Expand Down Expand Up @@ -149,7 +150,7 @@ pub fn v1_get_bulk_job_progress(
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v1" / "bulk" / i32)
.and(warp::get())
.and(with_db(config.get_pg_pool()))
.and(with_worker_db(config))
.and_then(http_handler)
// View access logs by setting `RUST_LOG=reacher`.
.with(warp::log(LOG_TARGET))
Expand Down
5 changes: 3 additions & 2 deletions backend/src/http/v1/bulk/get_results/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use std::{convert::TryInto, sync::Arc};
use warp::http::StatusCode;
use warp::Filter;

use super::with_worker_db;
use crate::config::BackendConfig;
use crate::http::{with_db, ReacherResponseError};
use crate::http::ReacherResponseError;
use csv_helper::{CsvResponse, CsvWrapper};

mod csv_helper;
Expand Down Expand Up @@ -180,7 +181,7 @@ pub fn v1_get_bulk_job_results(
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v1" / "bulk" / i32 / "results")
.and(warp::get())
.and(with_db(config.get_pg_pool()))
.and(with_worker_db(config))
.and(warp::query::<Request>())
.and_then(http_handler)
// View access logs by setting `RUST_LOG=reacher_backend`.
Expand Down
Loading

0 comments on commit a4efecb

Please sign in to comment.