From 4de3c51a4a8cd34a090cb3bb3b4b9890e85f49e8 Mon Sep 17 00:00:00 2001 From: Piotr Orzechowski Date: Tue, 27 Feb 2024 11:23:27 +0100 Subject: [PATCH] Add single email sending --- Cargo.lock | 119 +++++++++++++------ Cargo.toml | 5 +- configuration/base.yaml | 5 + configuration/production.yaml | 3 + src/app_state.rs | 8 ++ src/configuration.rs | 21 ++++ src/domain/subscriber_email.rs | 2 +- src/email_client.rs | 205 +++++++++++++++++++++++++++++++++ src/lib.rs | 2 + src/main.rs | 16 ++- src/routes/health_check.rs | 4 +- src/routes/subscriptions.rs | 13 ++- src/startup.rs | 15 ++- tests/util/mod.rs | 15 ++- 14 files changed, 384 insertions(+), 49 deletions(-) create mode 100644 src/app_state.rs create mode 100644 src/email_client.rs diff --git a/Cargo.lock b/Cargo.lock index 429ba30..b550cc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -367,6 +377,24 @@ dependencies = [ "typenum", ] +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" + [[package]] name = "der" version = "0.7.8" @@ -535,6 +563,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -579,6 +622,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -597,8 +651,10 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -865,19 +921,7 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", -] - -[[package]] -name = "hyper-tls" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" -dependencies = [ - "bytes", - "hyper 0.14.28", - "native-tls", - "tokio", - "tokio-native-tls", + "want", ] [[package]] @@ -1594,23 +1638,19 @@ dependencies = [ "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", - "hyper-tls", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "system-configuration", "tokio", - "tokio-native-tls", "tower-service", "url", "wasm-bindgen", @@ -1680,15 +1720,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls-pemfile" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" -dependencies = [ - "base64", -] - [[package]] name = "rustversion" version = "1.0.14" @@ -2332,16 +2363,6 @@ dependencies = [ "syn 2.0.50", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-stream" version = "0.1.14" @@ -2934,6 +2955,30 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wiremock" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec874e1eef0df2dcac546057fe5e29186f09c378181cd7b635b4b7bcc98e9d81" +dependencies = [ + "assert-json-diff", + "async-trait", + "base64", + "deadpool", + "futures", + "http 1.0.0", + "http-body-util", + "hyper 1.2.0", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "yaml-rust" version = "0.4.5" @@ -2957,6 +3002,7 @@ dependencies = [ "secrecy", "serde", "serde-aux", + "serde_json", "sqlx", "time", "tokio", @@ -2969,6 +3015,7 @@ dependencies = [ "unicode-segmentation", "uuid", "validator", + "wiremock", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a5907c3..637e6c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ name = "zero2prod" axum = "0.7.4" config = "0.14.0" once_cell = "1.19.0" +reqwest = { version = "0.11.24", features = ["json"], default-features = false } secrecy = { version = "0.8.0", features = ["serde"] } serde = { version = "1.0.196", features = ["derive"] } serde-aux = { version = "4.4.0", default-features = false } @@ -36,4 +37,6 @@ validator = "0.16.1" claims = "0.7.1" fake = "2.9.2" proptest = "1.4.0" -reqwest = "0.11.24" +serde_json = "1.0.114" +tokio = { version = "1.36.0", features = ["macros", "rt"] } +wiremock = "0.6.0" diff --git a/configuration/base.yaml b/configuration/base.yaml index f938817..7fbb048 100644 --- a/configuration/base.yaml +++ b/configuration/base.yaml @@ -6,3 +6,8 @@ database: username: postgres password: password database_name: newsletter +email_client: + base_url: localhost + sender_email: test@orzechowski.tech + authorization_token: my-secret-token + timeout_milliseconds: 10000 diff --git a/configuration/production.yaml b/configuration/production.yaml index aa64e88..7b5e29d 100644 --- a/configuration/production.yaml +++ b/configuration/production.yaml @@ -2,3 +2,6 @@ application: host: ::0 database: require_ssl: true +email_client: + base_url: https://api.postmarkapp.com + sender_email: zero2prod@orzechowski.tech diff --git a/src/app_state.rs b/src/app_state.rs new file mode 100644 index 0000000..56c01d8 --- /dev/null +++ b/src/app_state.rs @@ -0,0 +1,8 @@ +use crate::email_client::EmailClient; +use sqlx::PgPool; + +#[derive(Clone)] +pub struct AppState { + pub db_pool: PgPool, + pub email_client: EmailClient, +} diff --git a/src/configuration.rs b/src/configuration.rs index 51886fa..321c189 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,3 +1,4 @@ +use crate::domain::SubscriberEmail; use secrecy::{ExposeSecret, Secret}; use serde::Deserialize; use serde_aux::field_attributes::deserialize_number_from_string; @@ -5,12 +6,14 @@ use sqlx::{ postgres::{PgConnectOptions, PgSslMode}, ConnectOptions, }; +use std::time::Duration; use tracing_log::log::LevelFilter; #[derive(Deserialize)] pub struct Settings { pub application: ApplicationSettings, pub database: DatabaseSettings, + pub email_client: EmailClientSettings, } #[derive(Deserialize)] @@ -54,6 +57,24 @@ impl DatabaseSettings { } } +#[derive(Deserialize)] +pub struct EmailClientSettings { + pub base_url: String, + sender_email: String, + pub authorization_token: Secret, + pub timeout_milliseconds: u64, +} + +impl EmailClientSettings { + pub fn sender(&self) -> Result { + SubscriberEmail::parse(self.sender_email.clone()) + } + + pub fn timeout(&self) -> Duration { + Duration::from_millis(self.timeout_milliseconds) + } +} + pub fn get_configuration() -> Result { let config_dir = std::env::current_dir() .map(|dir| dir.join("configuration")) diff --git a/src/domain/subscriber_email.rs b/src/domain/subscriber_email.rs index 11be0ff..9b99d87 100644 --- a/src/domain/subscriber_email.rs +++ b/src/domain/subscriber_email.rs @@ -1,7 +1,7 @@ use serde::Deserialize; use validator::validate_email; -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct SubscriberEmail(String); impl SubscriberEmail { diff --git a/src/email_client.rs b/src/email_client.rs new file mode 100644 index 0000000..65c4672 --- /dev/null +++ b/src/email_client.rs @@ -0,0 +1,205 @@ +use crate::domain::SubscriberEmail; +use reqwest::{Client, Error}; +use secrecy::{ExposeSecret, Secret}; +use serde::Serialize; +use std::time::Duration; + +#[derive(Clone)] +pub struct EmailClient { + http_client: Client, + base_url: String, + sender: SubscriberEmail, + authorization_token: Secret, +} + +impl EmailClient { + pub fn new( + base_url: String, + sender: SubscriberEmail, + authorization_token: Secret, + timeout: Duration, + ) -> Self { + let http_client = Client::builder().timeout(timeout).build().unwrap(); + + Self { + http_client, + base_url, + sender, + authorization_token, + } + } + + pub async fn send_email( + &self, + recipient: SubscriberEmail, + subject: &str, + html_content: &str, + text_content: &str, + ) -> Result<(), Error> { + let url = format!("{}/email", &self.base_url); + let request_body = SendEmailRequest { + from: self.sender.as_ref(), + to: recipient.as_ref(), + subject, + html_body: html_content, + text_body: text_content, + }; + + self.http_client + .post(&url) + .header( + "X-Postmark-Server-Token", + self.authorization_token.expose_secret(), + ) + .json(&request_body) + .send() + .await? + .error_for_status()?; + + Ok(()) + } +} + +#[derive(Serialize)] +#[serde(rename_all = "PascalCase")] +struct SendEmailRequest<'a> { + from: &'a str, + to: &'a str, + subject: &'a str, + html_body: &'a str, + text_body: &'a str, +} + +#[cfg(test)] +mod tests { + use claims::{assert_err, assert_ok}; + use matcher::SendEmailBodyMatcher; + use std::time::Duration; + use util::*; + use wiremock::{ + matchers::{any, header, header_exists, method, path}, + Mock, MockServer, ResponseTemplate, + }; + + #[tokio::test] + async fn send_email_sends_the_expected_request() { + // given + let mock_server = MockServer::start().await; + let email_client = email_client(mock_server.uri()); + + Mock::given(header_exists("X-Postmark-Server-Token")) + .and(header("Content-Type", "application/json")) + .and(path("/email")) + .and(method("POST")) + .and(SendEmailBodyMatcher) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&mock_server) + .await; + + // when + let response = email_client + .send_email(email(), &subject(), &content(), &content()) + .await; + + // then + assert_ok!(response); + } + + #[tokio::test] + async fn send_email_fails_if_the_server_returns_500() { + // given + let mock_server = MockServer::start().await; + let email_client = email_client(mock_server.uri()); + + Mock::given(any()) + .respond_with(ResponseTemplate::new(500)) + .expect(1) + .mount(&mock_server) + .await; + + // when + let response = email_client + .send_email(email(), &subject(), &content(), &content()) + .await; + + // then + assert_err!(response); + } + + #[tokio::test] + async fn send_email_times_out_if_the_server_takes_too_long() { + // given + let mock_server = MockServer::start().await; + let email_client = email_client(mock_server.uri()); + + Mock::given(any()) + .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_millis(300))) + .expect(1) + .mount(&mock_server) + .await; + + // when + let response = email_client + .send_email(email(), &subject(), &content(), &content()) + .await; + + // then + assert_err!(response); + } + + mod matcher { + use serde_json::{from_slice, Value}; + use wiremock::{Match, Request}; + + pub struct SendEmailBodyMatcher; + + impl Match for SendEmailBodyMatcher { + fn matches(&self, request: &Request) -> bool { + let result: Result = from_slice(&request.body); + + if let Ok(body) = result { + ["From", "To", "Subject", "HtmlBody", "TextBody"] + .iter() + .all(|&value| body.get(value).is_some()) + } else { + false + } + } + } + } + + mod util { + use crate::{domain::SubscriberEmail, email_client::EmailClient}; + use fake::{ + faker::{ + internet::en::SafeEmail, + lorem::en::{Paragraph, Sentence}, + }, + Fake, Faker, + }; + use secrecy::Secret; + use std::time::Duration; + + pub fn email_client(base_url: String) -> EmailClient { + EmailClient::new( + base_url, + email(), + Secret::new(Faker.fake()), + Duration::from_millis(200), + ) + } + + pub fn email() -> SubscriberEmail { + SubscriberEmail::parse(SafeEmail().fake()).unwrap() + } + + pub fn subject() -> String { + Sentence(1..2).fake() + } + + pub fn content() -> String { + Paragraph(1..10).fake() + } + } +} diff --git a/src/lib.rs b/src/lib.rs index d0c0948..b06a726 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ +pub mod app_state; pub mod configuration; pub mod domain; +pub mod email_client; pub mod request_id; pub mod routes; pub mod startup; diff --git a/src/main.rs b/src/main.rs index 78f9e92..6a49f8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use sqlx::PgPool; use tokio::net::TcpListener; use zero2prod::{ configuration::get_configuration, + email_client::EmailClient, startup::run, telemetry::{get_subscriber, init_subscriber}, }; @@ -20,5 +21,18 @@ async fn main() -> Result<(), std::io::Error> { let pool = PgPool::connect_lazy_with(config.database.with_db()); - run(listener, pool).await + let sender_email = config + .email_client + .sender() + .expect("Invalid sender email address"); + let timeout = config.email_client.timeout(); + + let email_client = EmailClient::new( + config.email_client.base_url, + sender_email, + config.email_client.authorization_token, + timeout, + ); + + run(listener, pool, email_client).await } diff --git a/src/routes/health_check.rs b/src/routes/health_check.rs index 02544cc..57a3ed8 100644 --- a/src/routes/health_check.rs +++ b/src/routes/health_check.rs @@ -1,7 +1,7 @@ +use crate::app_state::AppState; use axum::{http::StatusCode, routing::get, Router}; -use sqlx::PgPool; -pub fn router() -> Router { +pub fn router() -> Router { Router::new().route("/health_check", get(health_check)) } diff --git a/src/routes/subscriptions.rs b/src/routes/subscriptions.rs index f8514fa..7e7a4a2 100644 --- a/src/routes/subscriptions.rs +++ b/src/routes/subscriptions.rs @@ -1,23 +1,26 @@ -use crate::domain::{NewSubscriber, SubscriberEmail, SubscriberName}; +use crate::{ + app_state::AppState, + domain::{NewSubscriber, SubscriberEmail, SubscriberName}, +}; use axum::{extract::State, http::StatusCode, routing::post, Form, Router}; use serde::Deserialize; use sqlx::PgPool; use time::OffsetDateTime; use uuid::Uuid; -pub fn router() -> Router { +pub fn router() -> Router { Router::new().route("/subscriptions", post(subscribe)) } #[tracing::instrument( name = "Adding new subscriber", - skip(form, db_pool), + skip(form, app_state), fields( subscriber_email = %form.email, subscriber_name = %form.name ) )] -async fn subscribe(State(db_pool): State, Form(form): Form) -> StatusCode { +async fn subscribe(State(app_state): State, Form(form): Form) -> StatusCode { let new_subscriber = match form.try_into() { Ok(subscriber) => subscriber, Err(e) => { @@ -26,7 +29,7 @@ async fn subscribe(State(db_pool): State, Form(form): Form) -> } }; - match insert_subscriber(&new_subscriber, &db_pool).await { + match insert_subscriber(&new_subscriber, &app_state.db_pool).await { Ok(_) => StatusCode::OK, Err(_) => StatusCode::INTERNAL_SERVER_ERROR, } diff --git a/src/startup.rs b/src/startup.rs index e6c95c5..8a5ab69 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -1,4 +1,6 @@ use crate::{ + app_state::AppState, + email_client::EmailClient, request_id::RequestUuid, routes::{health_check, subscriptions}, telemetry::request_span, @@ -13,11 +15,20 @@ use tower_http::{ }; use tracing::Level; -pub async fn run(listener: TcpListener, db_pool: PgPool) -> Result<(), std::io::Error> { +pub async fn run( + listener: TcpListener, + db_pool: PgPool, + email_client: EmailClient, +) -> Result<(), std::io::Error> { + let app_state = AppState { + db_pool, + email_client, + }; + let app = Router::new() .merge(health_check::router()) .merge(subscriptions::router()) - .with_state(db_pool) + .with_state(app_state) .layer( ServiceBuilder::new() .set_x_request_id(RequestUuid) diff --git a/tests/util/mod.rs b/tests/util/mod.rs index c6c9fa9..e63d4fc 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -4,6 +4,7 @@ use std::net::SocketAddr; use uuid::Uuid; use zero2prod::{ configuration::{get_configuration, DatabaseSettings}, + email_client::EmailClient, startup::run, telemetry::{get_subscriber, init_subscriber}, }; @@ -41,8 +42,20 @@ pub async fn spawn_app() -> TestApp { }; let pool = app.db_pool.clone(); + let sender_email = config.email_client.sender().expect("Invalid sender email"); + let timeout = config.email_client.timeout(); + + let email_client = EmailClient::new( + config.email_client.base_url, + sender_email, + config.email_client.authorization_token, + timeout, + ); + tokio::spawn(async move { - run(listener, pool).await.expect("Failed to run server"); + run(listener, pool, email_client) + .await + .expect("Failed to run server"); }); app