Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
Add background worker to send emails
Browse files Browse the repository at this point in the history
  • Loading branch information
0rzech committed Apr 15, 2024
1 parent 1704f86 commit 6cae6d8
Show file tree
Hide file tree
Showing 17 changed files with 439 additions and 95 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fake = "2.9.2"
linkify = "0.10.0"
proptest = "1.4.0"
serde_json = "1.0.114"
serde_urlencoded = "0.7.1"
tokio = { version = "1.36.0", features = ["macros", "rt"] }
wiremock = "0.6.0"

Expand Down
8 changes: 8 additions & 0 deletions migrations/20240414064846_create_newsletter_issues_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE newsletter_issues (
newsletter_issue_id uuid NOT NULL,
title TEXT NOT NULL,
text_content TEXT NOT NULL,
html_content TEXT NOT NULL,
published_at TEXT NOT NULL,
PRIMARY KEY(newsletter_issue_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE issue_delivery_queue (
newsletter_issue_id uuid NOT NULL
REFERENCES newsletter_issues (newsletter_issue_id),
subscriber_email TEXT NOT NULL,
PRIMARY KEY (newsletter_issue_id, subscriber_email)
);
19 changes: 14 additions & 5 deletions src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::domain::SubscriberEmail;
use crate::{domain::SubscriberEmail, email_client::EmailClient};
use secrecy::{ExposeSecret, Secret};
use serde::Deserialize;
use serde_aux::field_attributes::deserialize_number_from_string;
Expand All @@ -9,14 +9,14 @@ use sqlx::{
use std::time::Duration;
use tracing_log::log::LevelFilter;

#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct Settings {
pub application: ApplicationSettings,
pub database: DatabaseSettings,
pub email_client: EmailClientSettings,
}

#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct ApplicationSettings {
pub host: String,
#[serde(deserialize_with = "deserialize_number_from_string")]
Expand All @@ -26,7 +26,7 @@ pub struct ApplicationSettings {
pub redis_uri: Secret<String>,
}

#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct DatabaseSettings {
pub host: String,
#[serde(deserialize_with = "deserialize_number_from_string")]
Expand Down Expand Up @@ -60,7 +60,7 @@ impl DatabaseSettings {
}
}

#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct EmailClientSettings {
pub base_url: String,
sender_email: String,
Expand All @@ -69,6 +69,15 @@ pub struct EmailClientSettings {
}

impl EmailClientSettings {
pub fn client(&self) -> EmailClient {
EmailClient::new(
self.base_url.clone(),
self.sender().expect("Invalid sender email address"),
self.authorization_token.clone(),
self.timeout(),
)
}

pub fn sender(&self) -> Result<SubscriberEmail, String> {
SubscriberEmail::parse(self.sender_email.clone())
}
Expand Down
155 changes: 155 additions & 0 deletions src/issue_delivery_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use crate::{
configuration::Settings, domain::SubscriberEmail, email_client::EmailClient,
startup::get_pg_connection_pool,
};
use sqlx::{Executor, PgPool, Postgres, Row, Transaction};
use std::time::Duration;
use tracing::Span;
use uuid::Uuid;

pub async fn run_worker_until_stopped(config: Settings) -> Result<(), anyhow::Error> {
let connection_pool = get_pg_connection_pool(&config.database);
let email_client = config.email_client.client();
worker_loop(&connection_pool, &email_client).await
}

async fn worker_loop(db_pool: &PgPool, email_client: &EmailClient) -> Result<(), anyhow::Error> {
loop {
match try_execute_task(db_pool, email_client).await {
Ok(ExecutionOutcome::TaskCompleted) => {}
Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await,
Err(_) => tokio::time::sleep(Duration::from_secs(1)).await,
}
}
}

#[tracing::instrument(
skip_all,
fields(
newsletter_issue_id=tracing::field::Empty,
subscriber_email=tracing::field::Empty),
err
)]
pub async fn try_execute_task(
db_pool: &PgPool,
email_client: &EmailClient,
) -> Result<ExecutionOutcome, anyhow::Error> {
if let Some((transaction, issue_id, email)) = dequeue_task(db_pool).await? {
Span::current()
.record("newsletter_issue_id", issue_id.to_string())
.record("subscriber_email", email.clone());

match SubscriberEmail::parse(email.clone()) {
Ok(email) => {
let issue = get_issue(db_pool, issue_id).await?;
if let Err(e) = email_client
.send_email(
&email,
&issue.title,
&issue.html_content,
&issue.text_content,
)
.await
{
tracing::error!(
error_cause_chain = ?e,
error.message = %e,
"Failed to deliver issue to a confirmed subscriber. Skipping."
);
}
}
Err(e) => {
tracing::error!(
error_cause_chain = ?e,
error.message = %e,
"Failed to deliver issue to a confirmed subscriber. \
Their email is invalid."
);
}
}

delete_task(transaction, issue_id, &email).await?;

Ok(ExecutionOutcome::TaskCompleted)
} else {
Ok(ExecutionOutcome::EmptyQueue)
}
}

type PgTransaction = Transaction<'static, Postgres>;

#[tracing::instrument(skip_all)]
async fn dequeue_task(
db_pool: &PgPool,
) -> Result<Option<(PgTransaction, Uuid, String)>, anyhow::Error> {
let mut transaction = db_pool.begin().await?;
let query = sqlx::query!(
r#"
SELECT newsletter_issue_id, subscriber_email
FROM issue_delivery_queue
FOR UPDATE
SKIP LOCKED
LIMIT 1
"#,
);

match transaction.fetch_optional(query).await? {
Some(row) => Ok(Some((
transaction,
row.try_get("newsletter_issue_id")?,
row.try_get("subscriber_email")?,
))),
None => Ok(None),
}
}

#[tracing::instrument(skip_all)]
async fn delete_task(
mut transaction: PgTransaction,
issue_id: Uuid,
email: &str,
) -> Result<(), anyhow::Error> {
let query = sqlx::query!(
r#"
DELETE FROM issue_delivery_queue
WHERE
newsletter_issue_id = $1 AND
subscriber_email = $2
"#,
issue_id,
email
);

transaction.execute(query).await?;
transaction.commit().await?;

Ok(())
}

#[tracing::instrument(skip_all)]
async fn get_issue(db_pool: &PgPool, issue_id: Uuid) -> Result<NewsletterIssue, anyhow::Error> {
let issue = sqlx::query_as!(
NewsletterIssue,
r#"
SELECT title, text_content, html_content
FROM newsletter_issues
WHERE newsletter_issue_id = $1
"#,
issue_id,
)
.fetch_one(db_pool)
.await?;

Ok(issue)
}

pub enum ExecutionOutcome {
TaskCompleted,
EmptyQueue,
}

struct NewsletterIssue {
title: String,
text_content: String,
html_content: String,
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod configuration;
pub mod domain;
pub mod email_client;
pub mod idempotency;
pub mod issue_delivery_worker;
pub mod request_id;
pub mod routes;
pub mod session_state;
Expand Down
Loading

0 comments on commit 6cae6d8

Please sign in to comment.