Skip to content

Commit

Permalink
feat(backend): Add POST /v1/bulk (#1413)
Browse files Browse the repository at this point in the history
* Move bulk to v0

* Add v1 bulk route

* Make it compile

* Make workers work

* Make it work

* fix tests

* Add length in response
  • Loading branch information
amaury1093 authored Dec 22, 2023
1 parent fcffc1a commit d9302d4
Show file tree
Hide file tree
Showing 21 changed files with 276 additions and 34 deletions.
3 changes: 1 addition & 2 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ csv = "1.3.0"
dotenv = "0.15.0"
futures = { version = "0.3.29", optional = true }
futures-lite = { version = "2.1.0", optional = true }
lapin = { version = "2.3.1", optional = true }
lapin = { version = "2.3.1" }
log = "0.4"
openssl = { version = "0.10.57", features = ["vendored"] }
reqwest = { version = "0.11.22", features = ["json"], optional = true }
Expand Down Expand Up @@ -40,7 +40,6 @@ warp = "0.3"
worker = [
"futures",
"futures-lite",
"lapin",
"reqwest",
"tokio-executor-trait",
"tokio-reactor-trait",
Expand Down
38 changes: 29 additions & 9 deletions backend/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,57 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

mod bulk;
pub mod check_email;
mod v0;
mod v1;
mod version;

use std::env;
use std::net::IpAddr;

use check_if_email_exists::LOG_TARGET;
use lapin::Channel;
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use tracing::info;
use warp::Filter;

use super::errors;

#[cfg(feature = "worker")]
pub fn create_routes(
o: Option<Pool<Postgres>>,
channel: Option<Channel>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
version::get::get_version()
.or(check_email::post::post_check_email())
.or(v0::check_email::post::post_check_email())
.or(v1::bulk::post::create_bulk_job(channel))
// The 3 following routes will 404 if o is None.
.or(bulk::post::create_bulk_job(o.clone()))
.or(bulk::get::get_bulk_job_status(o.clone()))
.or(bulk::results::get_bulk_job_result(o))
.or(v0::bulk::post::create_bulk_job(o.clone()))
.or(v0::bulk::get::get_bulk_job_status(o.clone()))
.or(v0::bulk::results::get_bulk_job_result(o))
.recover(errors::handle_rejection)
}

#[cfg(not(feature = "worker"))]
pub fn create_routes(
o: Option<Pool<Postgres>>,
_channel: Option<Channel>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
version::get::get_version()
.or(v0::check_email::post::post_check_email())
// The 3 following routes will 404 if o is None.
.or(v0::bulk::post::create_bulk_job(o.clone()))
.or(v0::bulk::get::get_bulk_job_status(o.clone()))
.or(v0::bulk::results::get_bulk_job_result(o))
.recover(errors::handle_rejection)
}

/// Runs the Warp server.
///
/// This function starts the Warp server and listens for incoming requests.
/// It returns a `Result` indicating whether the server started successfully or encountered an error.
pub async fn run_warp_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
pub async fn run_warp_server(
channel: Option<Channel>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let host = env::var("RCH_HTTP_HOST")
.unwrap_or_else(|_| "127.0.0.1".into())
.parse::<IpAddr>()
Expand All @@ -59,13 +79,13 @@ pub async fn run_warp_server() -> Result<(), Box<dyn std::error::Error + Send +
let is_bulk_enabled = env::var("RCH_ENABLE_BULK").unwrap_or_else(|_| "0".into()) == "1";
let db = if is_bulk_enabled {
let pool = create_db().await?;
let _registry = bulk::create_job_registry(&pool).await?;
let _registry = v0::bulk::create_job_registry(&pool).await?;
Some(pool)
} else {
None
};

let routes = create_routes(db);
let routes = create_routes(db, channel);

info!(target: LOG_TARGET, host=?host,port=?port, "Server is listening");
warp::serve(routes).run((host, port)).await;
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! This file implements the `POST /bulk` endpoint.
//! This file implements the `POST /v0/bulk` endpoint.
use check_if_email_exists::CheckEmailInputProxy;
use check_if_email_exists::LOG_TARGET;
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! This file implements the `POST /check_email` endpoint.
//! This file implements the `POST /v0/check_email` endpoint.
use check_if_email_exists::CheckEmailInput;
use check_if_email_exists::LOG_TARGET;
Expand Down
2 changes: 2 additions & 0 deletions backend/src/http/v0/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod bulk;
pub mod check_email;
40 changes: 40 additions & 0 deletions backend/src/http/v1/bulk/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Reacher - Email Verification
// Copyright (C) 2018-2023 Reacher

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use warp::reject;

/// Catch all error struct for the bulk endpoints
#[derive(Debug)]
pub enum BulkError {
EmptyInput,
Serde(serde_json::Error),
Lapin(lapin::Error),
}

// Defaults to Internal server error
impl reject::Reject for BulkError {}

impl From<serde_json::Error> for BulkError {
fn from(e: serde_json::Error) -> Self {
BulkError::Serde(e)
}
}

impl From<lapin::Error> for BulkError {
fn from(e: lapin::Error) -> Self {
BulkError::Lapin(e)
}
}
2 changes: 2 additions & 0 deletions backend/src/http/v1/bulk/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod error;
pub mod post;
148 changes: 148 additions & 0 deletions backend/src/http/v1/bulk/post.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Reacher - Email Verification
// Copyright (C) 2018-2023 Reacher

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! This file implements the `POST /v1/bulk` endpoint.
use check_if_email_exists::CheckEmailInput;
use check_if_email_exists::CheckEmailInputProxy;
use check_if_email_exists::LOG_TARGET;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use lapin::Channel;
use lapin::{options::*, BasicProperties};
use serde::{Deserialize, Serialize};
use tracing::debug;
use warp::Filter;

use super::error::BulkError;
use crate::check::check_header;
use crate::worker::check_email::CheckEmailPayload;
use crate::worker::check_email::CheckEmailWebhook;

/// Endpoint request body.
#[derive(Debug, Deserialize)]
struct CreateBulkRequest {
input: Vec<String>,
proxy: Option<CheckEmailInputProxy>,
hello_name: Option<String>,
from_email: Option<String>,
webhook: Option<CheckEmailWebhook>,
}

/// Endpoint response body.
#[derive(Clone, Debug, Deserialize, Serialize)]
struct CreateBulkResponse {
message: String,
}

async fn create_bulk_request(
channel: Channel,
body: CreateBulkRequest,
) -> Result<impl warp::Reply, warp::Rejection> {
if body.input.is_empty() {
return Err(BulkError::EmptyInput.into());
}

let payloads = body.input.iter().map(|email| {
let mut input = CheckEmailInput::new(email.to_string());
if let Some(from_email) = &body.from_email {
input.set_from_email(from_email.clone());
}
if let Some(hello_name) = &body.hello_name {
input.set_hello_name(hello_name.clone());
}
if let Some(proxy) = &body.proxy {
input.set_proxy(proxy.clone());
}

CheckEmailPayload {
input,
webhook: body.webhook.clone(),
}
});

let n = payloads.len();
let stream = futures::stream::iter(payloads);

stream
.map::<Result<_, BulkError>, _>(Ok)
.try_for_each_concurrent(10, |payload| {
let channel = channel.clone();
let properties: lapin::protocol::basic::AMQPProperties = BasicProperties::default()
.with_content_type("application/json".into())
.with_priority(1);

async move {
let payload_u8 = serde_json::to_vec(&payload)?;
let queue_name = "check_email.Smtp"; // TODO We might want to make this configurable.
channel
.basic_publish(
"",
queue_name,
BasicPublishOptions::default(),
&payload_u8,
properties,
)
.await?
.await?;

debug!(target: LOG_TARGET, email=?payload.input.to_email, queue=?queue_name, "Enqueued");

Ok(())
}
})
.await?;

Ok(warp::reply::json(&CreateBulkResponse {
message: format!("Successfully added {n} emails to the queue"),
}))
}

/// Create the `POST /bulk` endpoint.
/// The endpoint accepts list of email address and creates
/// a new job to check them.
pub fn create_bulk_job(
o: Option<Channel>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v1" / "bulk")
.and(warp::post())
.and(check_header())
.and(with_channel(o))
// When accepting a body, we want a JSON body (and to reject huge
// payloads)...
// TODO: Configure max size limit for a bulk job
.and(warp::body::content_length_limit(1024 * 16))
.and(warp::body::json())
.and_then(create_bulk_request)
// View access logs by setting `RUST_LOG=reacher_backend`.
.with(warp::log(LOG_TARGET))
}

/// Warp filter that extracts lapin Channel.
fn with_channel(
o: Option<Channel>,
) -> impl Filter<Extract = (Channel,), Error = warp::Rejection> + Clone {
warp::any().and_then(move || {
let o = o.clone();
async move {
if let Some(channel) = o {
Ok(channel)
} else {
Err(warp::reject::not_found())
}
}
})
}
2 changes: 2 additions & 0 deletions backend/src/http/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(feature = "worker")]
pub mod bulk;
25 changes: 23 additions & 2 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
//! Main entry point of the `reacher_backend` binary. It has two `main`
//! functions, depending on whether the `bulk` feature is enabled or not.
#[cfg(feature = "worker")]
use std::env;

use check_if_email_exists::LOG_TARGET;
#[cfg(feature = "worker")]
use futures::try_join;
use tracing::info;

#[cfg(feature = "worker")]
use reacher_backend::worker::create_channel;
#[cfg(feature = "worker")]
use reacher_backend::worker::run_worker;
use reacher_backend::{
Expand All @@ -41,10 +46,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Setup sentry bug tracking.
let _guard: sentry::ClientInitGuard = setup_sentry();

let _http_server = run_warp_server();
#[cfg(feature = "worker")]
let backend_name = env::var("RCH_BACKEND_NAME").expect("RCH_BACKEND_NAME is not set");

#[cfg(feature = "worker")]
let channel = { Some(create_channel(&backend_name).await?) };
#[cfg(not(feature = "worker"))]
let channel = None;

let _http_server = run_warp_server(channel.clone());

#[cfg(feature = "worker")]
try_join!(_http_server, run_worker())?;
try_join!(
_http_server,
run_worker(
channel.expect("If worker feature is set, channel is set."),
&backend_name
)
)?;
#[cfg(not(feature = "worker"))]
_http_server.await?;

Ok(())
}
4 changes: 2 additions & 2 deletions backend/src/worker/check_email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use tracing::{debug, info};

use crate::check::check_email;

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct CheckEmailPayload {
pub input: CheckEmailInput,
pub webhook: Option<CheckEmailWebhook>,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct CheckEmailWebhook {
pub url: String,
pub extra: serde_json::Value,
Expand Down
Loading

0 comments on commit d9302d4

Please sign in to comment.