Skip to content

Commit

Permalink
feat(backend): Add reply-to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
amaury1093 committed Dec 11, 2023
1 parent ea80690 commit aaea59f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 32 deletions.
54 changes: 23 additions & 31 deletions backend/src/worker/check_email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,23 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use check_if_email_exists::CheckEmailInput;
use check_if_email_exists::LOG_TARGET;
use check_if_email_exists::{CheckEmailInput, CheckEmailOutput};
use lapin::message::Delivery;
use lapin::options::*;
use lapin::{options::*, BasicProperties, Channel};
use serde::Deserialize;
use serde::Serialize;
use tracing::{debug, info};

use crate::check::check_email;

#[derive(Debug, Deserialize)]
pub struct CheckEmailPayload {
pub input: CheckEmailInput,
pub webhook: Option<CheckEmailWebhook>,
}

#[derive(Debug, Deserialize)]
pub struct CheckEmailWebhook {
pub url: String,
pub extra: serde_json::Value,
}

#[derive(Debug, Serialize)]
struct WebhookOutput {
output: CheckEmailOutput,
extra: serde_json::Value,
}

/// Processes the check email task asynchronously.
pub async fn process_check_email(
channel: &Channel,
delivery: Delivery,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let payload = serde_json::from_slice::<CheckEmailPayload>(&delivery.data)?;
Expand All @@ -53,24 +40,29 @@ pub async fn process_check_email(
let output = check_email(payload.input).await;
debug!(target: LOG_TARGET, email=output.input,output=?output, "Done check-if-email-exists");

// Check if we have a webhook to send the output to.
if let Some(webhook) = payload.webhook {
let webhook_output = WebhookOutput {
output,
extra: webhook.extra,
};
let reply_payload = serde_json::to_string(&output)?;
let reply_payload = reply_payload.as_bytes();

// Send reply by following this guide:
// https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html
if let (Some(reply_to), Some(correlation_id)) = (
delivery.properties.reply_to(),
delivery.properties.correlation_id(),
) {
let properties = BasicProperties::default()
.with_correlation_id(correlation_id.to_owned())
.with_content_type("application/json".into());

let client = reqwest::Client::new();
let res = client
.post(webhook.url)
.json(&webhook_output)
.header("x-reacher-secret", std::env::var("RCH_HEADER_SECRET")?)
.send()
channel
.basic_publish(
"",
reply_to.as_str(),
BasicPublishOptions::default(),
reply_payload,
properties,
)
.await?
.text()
.await?;
debug!(target: LOG_TARGET, email=?webhook_output.output.input,res=?res, "Received webhook response");
info!(target: LOG_TARGET, email=?webhook_output.output.input, is_reachable=?webhook_output.output.is_reachable, "Finished check");
}

delivery.ack(BasicAckOptions::default()).await?;
Expand Down
12 changes: 11 additions & 1 deletion backend/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ pub async fn run_worker() -> Result<(), Box<dyn std::error::Error + Send + Sync>

// Receive channel
let channel = conn.create_channel().await?;
channel
.basic_qos(
env::var("RCH_WORKER_CONCURRENCY")
.ok()
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(10),
BasicQosOptions { global: false },
)
.await?;
info!(target: LOG_TARGET, backend=?backend_name,state=?conn.status().state(), "Connected to AMQP broker");

// Create queue "check_email.{Smtp,Headless}" with priority.
Expand Down Expand Up @@ -75,8 +84,9 @@ pub async fn run_worker() -> Result<(), Box<dyn std::error::Error + Send + Sync>

while let Some(delivery) = consumer.next().await {
if let Ok(delivery) = delivery {
let channel = channel.clone();
tokio::spawn(async move {
let res = process_check_email(delivery).await;
let res = process_check_email(&channel, delivery).await;
if let Err(err) = res {
error!(target: LOG_TARGET, error=?err, "Error processing message");
}
Expand Down

0 comments on commit aaea59f

Please sign in to comment.