diff --git a/.sqlx/query-06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a.json b/.sqlx/query-06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a.json new file mode 100644 index 0000000..c63606e --- /dev/null +++ b/.sqlx/query-06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT newsletter_issue_id, subscriber_email\n FROM issue_delivery_queue\n FOR UPDATE\n SKIP LOCKED\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "newsletter_issue_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "subscriber_email", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a" +} diff --git a/.sqlx/query-269b8424cd193d5fc98bc829a4e15fd313a8ed7401d03c4c72da624a929aa32e.json b/.sqlx/query-269b8424cd193d5fc98bc829a4e15fd313a8ed7401d03c4c72da624a929aa32e.json deleted file mode 100644 index a7365cc..0000000 --- a/.sqlx/query-269b8424cd193d5fc98bc829a4e15fd313a8ed7401d03c4c72da624a929aa32e.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT email\n FROM subscriptions\n WHERE status = $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "email", - "type_info": "Text" - } - ], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [ - false - ] - }, - "hash": "269b8424cd193d5fc98bc829a4e15fd313a8ed7401d03c4c72da624a929aa32e" -} diff --git a/.sqlx/query-43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46.json b/.sqlx/query-43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46.json new file mode 100644 index 0000000..193d3c4 --- /dev/null +++ b/.sqlx/query-43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT title, text_content, html_content\n FROM newsletter_issues\n WHERE newsletter_issue_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "title", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "text_content", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "html_content", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46" +} diff --git a/.sqlx/query-794c0ce1ab5e766961132366163df7a7183ae7985228bf585700250deb38b726.json b/.sqlx/query-794c0ce1ab5e766961132366163df7a7183ae7985228bf585700250deb38b726.json new file mode 100644 index 0000000..bdac549 --- /dev/null +++ b/.sqlx/query-794c0ce1ab5e766961132366163df7a7183ae7985228bf585700250deb38b726.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO newsletter_issues (\n newsletter_issue_id,\n title,\n text_content,\n html_content,\n published_at\n )\n VALUES ($1, $2, $3, $4, now())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "794c0ce1ab5e766961132366163df7a7183ae7985228bf585700250deb38b726" +} diff --git a/.sqlx/query-9341e1139459e8f21883417b57ca8421442532b40de510bae5880a24476753ef.json b/.sqlx/query-9341e1139459e8f21883417b57ca8421442532b40de510bae5880a24476753ef.json new file mode 100644 index 0000000..27f6749 --- /dev/null +++ b/.sqlx/query-9341e1139459e8f21883417b57ca8421442532b40de510bae5880a24476753ef.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM issue_delivery_queue\n WHERE\n newsletter_issue_id = $1 AND\n subscriber_email = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "9341e1139459e8f21883417b57ca8421442532b40de510bae5880a24476753ef" +} diff --git a/.sqlx/query-9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d.json b/.sqlx/query-9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d.json new file mode 100644 index 0000000..93e684b --- /dev/null +++ b/.sqlx/query-9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO issue_delivery_queue (\n newsletter_issue_id,\n subscriber_email\n )\n SELECT $1, email\n FROM subscriptions\n WHERE status = 'confirmed'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d" +} diff --git a/Cargo.lock b/Cargo.lock index c2ed69b..25be8e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3504,6 +3504,7 @@ dependencies = [ "serde", "serde-aux", "serde_json", + "serde_urlencoded", "sqlx", "thiserror", "time", diff --git a/Cargo.toml b/Cargo.toml index f09de13..8fa50a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ fake = "2.9.2" linkify = "0.10.0" proptest = "1.4.0" serde_json = "1.0.114" +serde_urlencoded = "0.7.1" tokio = { version = "1.36.0", features = ["macros", "rt"] } wiremock = "0.6.0" diff --git a/migrations/20240414064846_create_newsletter_issues_table.sql b/migrations/20240414064846_create_newsletter_issues_table.sql new file mode 100644 index 0000000..67d58a3 --- /dev/null +++ b/migrations/20240414064846_create_newsletter_issues_table.sql @@ -0,0 +1,8 @@ +CREATE TABLE newsletter_issues ( + newsletter_issue_id uuid NOT NULL, + title TEXT NOT NULL, + text_content TEXT NOT NULL, + html_content TEXT NOT NULL, + published_at TEXT NOT NULL, + PRIMARY KEY(newsletter_issue_id) +); diff --git a/migrations/20240414065943_create_issue_delivery_queue_table.sql b/migrations/20240414065943_create_issue_delivery_queue_table.sql new file mode 100644 index 0000000..dd7aa91 --- /dev/null +++ b/migrations/20240414065943_create_issue_delivery_queue_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE issue_delivery_queue ( + newsletter_issue_id uuid NOT NULL + REFERENCES newsletter_issues (newsletter_issue_id), + subscriber_email TEXT NOT NULL, + PRIMARY KEY (newsletter_issue_id, subscriber_email) +); diff --git a/src/configuration.rs b/src/configuration.rs index 82b276e..569303d 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,4 +1,4 @@ -use crate::domain::SubscriberEmail; +use crate::{domain::SubscriberEmail, email_client::EmailClient}; use secrecy::{ExposeSecret, Secret}; use serde::Deserialize; use serde_aux::field_attributes::deserialize_number_from_string; @@ -9,14 +9,14 @@ use sqlx::{ use std::time::Duration; use tracing_log::log::LevelFilter; -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct Settings { pub application: ApplicationSettings, pub database: DatabaseSettings, pub email_client: EmailClientSettings, } -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct ApplicationSettings { pub host: String, #[serde(deserialize_with = "deserialize_number_from_string")] @@ -26,7 +26,7 @@ pub struct ApplicationSettings { pub redis_uri: Secret, } -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct DatabaseSettings { pub host: String, #[serde(deserialize_with = "deserialize_number_from_string")] @@ -60,7 +60,7 @@ impl DatabaseSettings { } } -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct EmailClientSettings { pub base_url: String, sender_email: String, @@ -69,6 +69,15 @@ pub struct EmailClientSettings { } impl EmailClientSettings { + pub fn client(&self) -> EmailClient { + EmailClient::new( + self.base_url.clone(), + self.sender().expect("Invalid sender email address"), + self.authorization_token.clone(), + self.timeout(), + ) + } + pub fn sender(&self) -> Result { SubscriberEmail::parse(self.sender_email.clone()) } diff --git a/src/issue_delivery_worker.rs b/src/issue_delivery_worker.rs new file mode 100644 index 0000000..c0e821f --- /dev/null +++ b/src/issue_delivery_worker.rs @@ -0,0 +1,155 @@ +use crate::{ + configuration::Settings, domain::SubscriberEmail, email_client::EmailClient, + startup::get_pg_connection_pool, +}; +use sqlx::{Executor, PgPool, Postgres, Row, Transaction}; +use std::time::Duration; +use tracing::Span; +use uuid::Uuid; + +pub async fn run_worker_until_stopped(config: Settings) -> Result<(), anyhow::Error> { + let connection_pool = get_pg_connection_pool(&config.database); + let email_client = config.email_client.client(); + worker_loop(&connection_pool, &email_client).await +} + +async fn worker_loop(db_pool: &PgPool, email_client: &EmailClient) -> Result<(), anyhow::Error> { + loop { + match try_execute_task(db_pool, email_client).await { + Ok(ExecutionOutcome::TaskCompleted) => {} + Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await, + Err(_) => tokio::time::sleep(Duration::from_secs(1)).await, + } + } +} + +#[tracing::instrument( + skip_all, + fields( + newsletter_issue_id=tracing::field::Empty, + subscriber_email=tracing::field::Empty), + err +)] +pub async fn try_execute_task( + db_pool: &PgPool, + email_client: &EmailClient, +) -> Result { + if let Some((transaction, issue_id, email)) = dequeue_task(db_pool).await? { + Span::current() + .record("newsletter_issue_id", issue_id.to_string()) + .record("subscriber_email", email.clone()); + + match SubscriberEmail::parse(email.clone()) { + Ok(email) => { + let issue = get_issue(db_pool, issue_id).await?; + if let Err(e) = email_client + .send_email( + &email, + &issue.title, + &issue.html_content, + &issue.text_content, + ) + .await + { + tracing::error!( + error_cause_chain = ?e, + error.message = %e, + "Failed to deliver issue to a confirmed subscriber. Skipping." + ); + } + } + Err(e) => { + tracing::error!( + error_cause_chain = ?e, + error.message = %e, + "Failed to deliver issue to a confirmed subscriber. \ + Their email is invalid." + ); + } + } + + delete_task(transaction, issue_id, &email).await?; + + Ok(ExecutionOutcome::TaskCompleted) + } else { + Ok(ExecutionOutcome::EmptyQueue) + } +} + +type PgTransaction = Transaction<'static, Postgres>; + +#[tracing::instrument(skip_all)] +async fn dequeue_task( + db_pool: &PgPool, +) -> Result, anyhow::Error> { + let mut transaction = db_pool.begin().await?; + let query = sqlx::query!( + r#" + SELECT newsletter_issue_id, subscriber_email + FROM issue_delivery_queue + FOR UPDATE + SKIP LOCKED + LIMIT 1 + "#, + ); + + match transaction.fetch_optional(query).await? { + Some(row) => Ok(Some(( + transaction, + row.try_get("newsletter_issue_id")?, + row.try_get("subscriber_email")?, + ))), + None => Ok(None), + } +} + +#[tracing::instrument(skip_all)] +async fn delete_task( + mut transaction: PgTransaction, + issue_id: Uuid, + email: &str, +) -> Result<(), anyhow::Error> { + let query = sqlx::query!( + r#" + DELETE FROM issue_delivery_queue + WHERE + newsletter_issue_id = $1 AND + subscriber_email = $2 + "#, + issue_id, + email + ); + + transaction.execute(query).await?; + transaction.commit().await?; + + Ok(()) +} + +#[tracing::instrument(skip_all)] +async fn get_issue(db_pool: &PgPool, issue_id: Uuid) -> Result { + let issue = sqlx::query_as!( + NewsletterIssue, + r#" + SELECT title, text_content, html_content + FROM newsletter_issues + WHERE newsletter_issue_id = $1 + "#, + issue_id, + ) + .fetch_one(db_pool) + .await?; + + Ok(issue) +} + +pub enum ExecutionOutcome { + TaskCompleted, + EmptyQueue, +} + +struct NewsletterIssue { + title: String, + text_content: String, + html_content: String, +} diff --git a/src/lib.rs b/src/lib.rs index d41f42f..311ed6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod configuration; pub mod domain; pub mod email_client; pub mod idempotency; +pub mod issue_delivery_worker; pub mod request_id; pub mod routes; pub mod session_state; diff --git a/src/main.rs b/src/main.rs index 115cafb..ad11459 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,8 @@ +use std::fmt::{Debug, Display}; +use tokio::task::JoinError; use zero2prod::{ configuration::get_configuration, + issue_delivery_worker::run_worker_until_stopped, startup::Application, telemetry::{get_subscriber, init_subscriber}, }; @@ -10,7 +13,25 @@ async fn main() -> Result<(), anyhow::Error> { init_subscriber(subscriber); let config = get_configuration().expect("Failed to read configuration"); - let app = Application::build(config).await; + let app = tokio::spawn(Application::build(config.clone()).await.run_until_stopped()); + let worker = tokio::spawn(run_worker_until_stopped(config)); - app.run_until_stopped().await + tokio::select! { + o = app => report_exit("API", o), + o = worker => report_exit("Background worker", o), + }; + + Ok(()) +} + +fn report_exit(task_name: &str, outcome: Result, JoinError>) { + match outcome { + Ok(Ok(())) => tracing::info!("{task_name} has exited"), + Ok(Err(e)) => { + tracing::error!(eror.cause_chain = ?e, error.message = %e, "{task_name} failed") + } + Err(e) => { + tracing::error!(eror.cause_chain = ?e, error.message = %e, "{task_name} task failed to complete") + } + } } diff --git a/src/routes/admin/newsletters/post.rs b/src/routes/admin/newsletters/post.rs index d8cd9b5..edd0df0 100644 --- a/src/routes/admin/newsletters/post.rs +++ b/src/routes/admin/newsletters/post.rs @@ -1,7 +1,6 @@ use crate::{ app_state::AppState, authentication::extract::SessionUserId, - domain::{SubscriberEmail, SubscriptionStatus}, idempotency::{save_response, try_processing, IdempotencyKey, NextAction}, utils::{e422, e500, HttpError}, }; @@ -10,55 +9,45 @@ use askama_axum::IntoResponse; use axum::{body::Body, extract::State, http::Response, response::Redirect, Form}; use axum_messages::Messages; use serde::Deserialize; -use sqlx::PgPool; +use sqlx::{Executor, Postgres, Transaction}; +use uuid::Uuid; -#[tracing::instrument(skip(app_state, user_id, messages, form))] +#[tracing::instrument(skip_all, fields(user_id=%user_id))] pub(in crate::routes::admin) async fn publish_newsletter( State(app_state): State, SessionUserId(user_id): SessionUserId, messages: Messages, Form(form): Form, ) -> Result, HttpError> { - let flash_success = || messages.info("Newsletter sent!"); let idempotency_key: IdempotencyKey = form.idempotency_key.try_into().map_err(e422)?; - let transaction = match try_processing(&app_state.db_pool, &idempotency_key, user_id) + let mut transaction = match try_processing(&app_state.db_pool, &idempotency_key, user_id) .await .map_err(e500)? { NextAction::StartProcessing(transaction) => transaction, NextAction::ReturnSavedResponse(saved_response) => { - flash_success(); + success_message(messages); return Ok(saved_response); } }; - for subscriber in get_confirmed_subscribers(&app_state.db_pool) + let issue_id = insert_newsletter_issue( + &mut transaction, + &form.title, + &form.text_content, + &form.html_content, + ) + .await + .context("Failed to store newsletter issue details") + .map_err(e500)?; + + enqueue_delivery_tasks(&mut transaction, issue_id) .await - .map_err(e500)? - { - match subscriber { - Ok(subscriber) => app_state - .email_client - .send_email( - &subscriber.email, - &form.title, - &form.html_content, - &form.text_content, - ) - .await - .with_context(|| { - format!("Failed to send newsletter issue to {}", subscriber.email) - })?, - Err(e) => tracing::warn!( - e.cause_chain = ?e, - "Skipping a confirmed subscriber. \ - Ther stored contact details are invalid" - ), - } - } + .context("Failed to enqueue delivery tasks") + .map_err(e500)?; - flash_success(); + success_message(messages); let response = Redirect::to("/admin/newsletters").into_response(); let response = save_response(transaction, &idempotency_key, user_id, response).await?; @@ -66,30 +55,57 @@ pub(in crate::routes::admin) async fn publish_newsletter( Ok(response) } -#[tracing::instrument(skip(db_pool))] -async fn get_confirmed_subscribers( - db_pool: &PgPool, -) -> Result>, anyhow::Error> { - let subscribers = sqlx::query!( +#[tracing::instrument(skip_all)] +async fn insert_newsletter_issue( + transaction: &mut Transaction<'_, Postgres>, + title: &str, + text_content: &str, + html_content: &str, +) -> Result { + let newsletter_issue_id = Uuid::new_v4(); + let query = sqlx::query!( r#" - SELECT email + INSERT INTO newsletter_issues ( + newsletter_issue_id, + title, + text_content, + html_content, + published_at + ) + VALUES ($1, $2, $3, $4, now()) + "#, + newsletter_issue_id, + title, + text_content, + html_content + ); + + transaction.execute(query).await?; + + Ok(newsletter_issue_id) +} + +#[tracing::instrument(skip_all)] +async fn enqueue_delivery_tasks( + transaction: &mut Transaction<'_, Postgres>, + newsletter_issue_id: Uuid, +) -> Result<(), sqlx::Error> { + let query = sqlx::query!( + r#" + INSERT INTO issue_delivery_queue ( + newsletter_issue_id, + subscriber_email + ) + SELECT $1, email FROM subscriptions - WHERE status = $1 + WHERE status = 'confirmed' "#, - SubscriptionStatus::Confirmed.as_ref(), - ) - .fetch_all(db_pool) - .await - .map(|rows| { - rows.into_iter() - .map(|row| match SubscriberEmail::parse(row.email) { - Ok(email) => Ok(ConfirmedSubscriber { email }), - Err(e) => Err(anyhow::anyhow!(e)), - }) - .collect() - })?; + newsletter_issue_id, + ); + + transaction.execute(query).await?; - Ok(subscribers) + Ok(()) } #[derive(Deserialize)] @@ -100,6 +116,9 @@ pub(in crate::routes::admin) struct FormData { idempotency_key: String, } -struct ConfirmedSubscriber { - email: SubscriberEmail, +fn success_message(messages: Messages) { + messages.info( + "The newsletter issue has been accepted \ + - emails will go out shortly.", + ); } diff --git a/tests/api/admin_newsletters.rs b/tests/api/admin_newsletters.rs index bb361aa..79debbf 100644 --- a/tests/api/admin_newsletters.rs +++ b/tests/api/admin_newsletters.rs @@ -1,10 +1,14 @@ use crate::helpers::{assert_redirect_to, ConfirmationLinks, TestApp}; +use fake::{ + faker::{internet::en::SafeEmail, name::en::Name}, + Fake, +}; use serde_json::json; use std::time::Duration; use uuid::Uuid; use wiremock::{ matchers::{any, method, path}, - Mock, ResponseTemplate, + Mock, MockBuilder, ResponseTemplate, }; #[tokio::test] @@ -19,8 +23,7 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { }); create_confirmed_subscriber(&app).await; - Mock::given(path("/email")) - .and(method("POST")) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(1) .mount(&app.email_server) @@ -39,6 +42,8 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { // then assert_redirect_to(&response, "/admin/newsletters"); + + app.dispatch_all_pending_emails().await; } #[tokio::test] @@ -167,8 +172,7 @@ async fn newsletter_creation_is_idempotent() { app.log_in(&app.test_user.username, &app.test_user.password) .await; - Mock::given(path("/email")) - .and(method("POST")) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(1) .mount(&app.email_server) @@ -178,7 +182,11 @@ async fn newsletter_creation_is_idempotent() { assert_redirect_to(&response, "/admin/newsletters"); let html_page = app.get_newsletter_form_html().await; - assert!(html_page.contains("

Newsletter sent!

")); + assert!(html_page.contains( + "

The newsletter issue has been accepted \ + - emails will go out shortly.

" + )); + app.dispatch_all_pending_emails().await; // when let response = app.post_publish_newsletter(&newsletter_request_body).await; @@ -186,7 +194,10 @@ async fn newsletter_creation_is_idempotent() { // then assert_redirect_to(&response, "/admin/newsletters"); let html_page = app.get_newsletter_form_html().await; - assert!(html_page.contains("

Newsletter sent!

")); + assert!(html_page.contains( + "

The newsletter issue has been accepted \ + - emails will go out shortly.

" + )); } #[tokio::test] @@ -203,8 +214,7 @@ async fn concurrent_form_submission_is_handled_gracefully() { create_confirmed_subscriber(&app).await; app.log_in(&app.test_user.username, &app.test_user.password) .await; - - Mock::given(path("/email")) + when_sending_an_email() .and(method("POST")) .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2))) .expect(1) @@ -223,19 +233,26 @@ async fn concurrent_form_submission_is_handled_gracefully() { response1.text().await.unwrap(), response2.text().await.unwrap() ); + + app.dispatch_all_pending_emails().await; } async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks { - let body = "name=Imi%C4%99%20Nazwisko&email=imie.nazwisko%40example.com"; - - let _mock_guard_ = Mock::given(path("/email")) - .and(method("POST")) + let name: String = Name().fake(); + let email: String = SafeEmail().fake(); + let body = serde_urlencoded::to_string(json!({ + "name": name, + "email": email, + })) + .unwrap(); + + let _mock_guard_ = when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(1) .mount_as_scoped(&app.email_server) .await; - app.post_subscriptions(body.into()) + app.post_subscriptions(body) .await .error_for_status() .unwrap(); @@ -258,3 +275,7 @@ async fn create_confirmed_subscriber(app: &TestApp) { .error_for_status() .unwrap(); } + +fn when_sending_an_email() -> MockBuilder { + Mock::given(path("/email")).and(method("POST")) +} diff --git a/tests/api/helpers.rs b/tests/api/helpers.rs index feb6038..d37d26f 100644 --- a/tests/api/helpers.rs +++ b/tests/api/helpers.rs @@ -11,6 +11,8 @@ use uuid::Uuid; use wiremock::MockServer; use zero2prod::{ configuration::{get_configuration, DatabaseSettings}, + email_client::EmailClient, + issue_delivery_worker::{try_execute_task, ExecutionOutcome}, startup::{get_pg_connection_pool, Application}, telemetry::{get_subscriber, init_subscriber}, }; @@ -31,6 +33,7 @@ pub struct TestApp { pub address: SocketAddr, pub db_pool: PgPool, pub email_server: MockServer, + pub email_client: EmailClient, pub test_user: TestUser, client: reqwest::Client, } @@ -48,6 +51,7 @@ impl TestApp { let db_pool = configure_database(&config.database).await; let email_server = MockServer::start().await; config.email_client.base_url = email_server.uri(); + let email_client = config.email_client.client(); let app = Application::build(config).await; let address = app.local_addr(); @@ -67,11 +71,24 @@ impl TestApp { address, db_pool, email_server, + email_client, test_user, client, } } + pub async fn dispatch_all_pending_emails(&self) { + loop { + if let ExecutionOutcome::EmptyQueue = + try_execute_task(&self.db_pool, &self.email_client) + .await + .unwrap() + { + break; + } + } + } + pub async fn get_health_check(&self) -> reqwest::Response { self.client .get(self.url("/health_check")) @@ -237,6 +254,7 @@ impl TestApp { .await .expect(Self::FAILED_TO_EXECUTE_REQUEST) } + fn url(&self, endpoint: &str) -> String { format!("http://{}{endpoint}", self.address) }