diff --git a/.sqlx/query-429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd.json b/.sqlx/query-429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd.json new file mode 100644 index 0000000..ef59776 --- /dev/null +++ b/.sqlx/query-429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO idempotency (\n user_id,\n idempotency_key,\n response_status_code,\n response_headers,\n response_body,\n created_at\n )\n VALUES ($1, $2, $3, $4, $5, now())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Int2", + { + "Custom": { + "name": "_header_pair", + "kind": { + "Array": { + "Custom": { + "name": "header_pair", + "kind": { + "Composite": [ + [ + "name", + "Text" + ], + [ + "value", + "Bytea" + ] + ] + } + } + } + } + } + }, + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd" +} diff --git a/.sqlx/query-a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489.json b/.sqlx/query-a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489.json new file mode 100644 index 0000000..8e8ca05 --- /dev/null +++ b/.sqlx/query-a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489.json @@ -0,0 +1,58 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n response_status_code,\n response_headers AS \"response_headers: Vec\",\n response_body\n FROM idempotency\n WHERE\n idempotency_key = $1 AND\n user_id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "response_status_code", + "type_info": "Int2" + }, + { + "ordinal": 1, + "name": "response_headers: Vec", + "type_info": { + "Custom": { + "name": "_header_pair", + "kind": { + "Array": { + "Custom": { + "name": "header_pair", + "kind": { + "Composite": [ + [ + "name", + "Text" + ], + [ + "value", + "Bytea" + ] + ] + } + } + } + } + } + } + }, + { + "ordinal": 2, + "name": "response_body", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489" +} diff --git a/migrations/20240413145104_create_idempotency_table.sql b/migrations/20240413145104_create_idempotency_table.sql new file mode 100644 index 0000000..5d0405f --- /dev/null +++ b/migrations/20240413145104_create_idempotency_table.sql @@ -0,0 +1,14 @@ +CREATE TYPE header_pair AS ( + name TEXT, + value BYTEA +); + +CREATE TABLE idempotency ( + user_id uuid NOT NULL REFERENCES users(user_id), + idempotency_key TEXT NOT NULL, + response_status_code SMALLINT NOT NULL, + response_headers header_pair[] NOT NULL, + response_body BYTEA NOT NULL, + created_at timestamptz NOT NULL, + PRIMARY KEY(user_id, idempotency_key) +); \ No newline at end of file diff --git a/src/idempotency/key.rs b/src/idempotency/key.rs new file mode 100644 index 0000000..acac1ce --- /dev/null +++ b/src/idempotency/key.rs @@ -0,0 +1,31 @@ +#[derive(Debug)] +pub struct IdempotencyKey(String); + +impl TryFrom for IdempotencyKey { + type Error = anyhow::Error; + + fn try_from(s: String) -> Result { + if s.is_empty() { + anyhow::bail!("The idempotency key cannot be empty"); + } + + let max_len = 50; + if s.len() >= max_len { + anyhow::bail!("The idempotency key must be shorter than {max_len} characters") + } + + Ok(Self(s)) + } +} + +impl From for String { + fn from(k: IdempotencyKey) -> Self { + k.0 + } +} + +impl AsRef for IdempotencyKey { + fn as_ref(&self) -> &str { + &self.0 + } +} diff --git a/src/idempotency/mod.rs b/src/idempotency/mod.rs new file mode 100644 index 0000000..bda225f --- /dev/null +++ b/src/idempotency/mod.rs @@ -0,0 +1,5 @@ +mod key; +mod persistence; + +pub use key::IdempotencyKey; +pub use persistence::{get_saved_response, save_response}; diff --git a/src/idempotency/persistence.rs b/src/idempotency/persistence.rs new file mode 100644 index 0000000..4ee072b --- /dev/null +++ b/src/idempotency/persistence.rs @@ -0,0 +1,100 @@ +use super::IdempotencyKey; +use axum::{ + body::{to_bytes, Body}, + http::{Response, StatusCode}, +}; +use sqlx::{postgres::PgHasArrayType, PgPool}; +use uuid::Uuid; + +#[tracing::instrument(skip(db_pool, idempotency_key, user_id))] +pub async fn get_saved_response( + db_pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, +) -> Result>, anyhow::Error> { + let saved_response = sqlx::query!( + r#" + SELECT + response_status_code, + response_headers AS "response_headers: Vec", + response_body + FROM idempotency + WHERE + idempotency_key = $1 AND + user_id = $2 + "#, + idempotency_key.as_ref(), + user_id + ) + .fetch_optional(db_pool) + .await?; + + if let Some(r) = saved_response { + let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?; + let mut response = Response::builder().status(status_code); + for header in r.response_headers { + response = response.header(header.name, header.value); + } + Ok(Some(response.body(Body::from(r.response_body))?)) + } else { + Ok(None) + } +} + +#[tracing::instrument(skip(db_pool, idempotency_key, user_id, response))] +pub async fn save_response( + db_pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, + response: Response, +) -> Result, anyhow::Error> { + let status_code = response.status().as_u16() as i16; + let headers = { + let mut h = Vec::with_capacity(response.headers().len()); + for (name, value) in response.headers() { + let name = name.to_string(); + let value = value.as_bytes().to_owned(); + h.push(HeaderPairRecord { name, value }); + } + h + }; + let (parts, body) = response.into_parts(); + let body = to_bytes(body, usize::MAX).await?; + + sqlx::query_unchecked!( + r#" + INSERT INTO idempotency ( + user_id, + idempotency_key, + response_status_code, + response_headers, + response_body, + created_at + ) + VALUES ($1, $2, $3, $4, $5, now()) + "#, + user_id, + idempotency_key.as_ref(), + status_code, + headers, + body.as_ref() + ) + .execute(db_pool) + .await?; + + let response = Response::from_parts(parts, Body::from(body)); + Ok(response) +} + +#[derive(Debug, sqlx::Type)] +#[sqlx(type_name = "header_pair")] +struct HeaderPairRecord { + name: String, + value: Vec, +} + +impl PgHasArrayType for HeaderPairRecord { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("_header_pair") + } +} diff --git a/src/lib.rs b/src/lib.rs index 306d74d..d41f42f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod authentication; pub mod configuration; pub mod domain; pub mod email_client; +pub mod idempotency; pub mod request_id; pub mod routes; pub mod session_state; diff --git a/src/routes/admin/newsletters/get.rs b/src/routes/admin/newsletters/get.rs index d380752..fc81f64 100644 --- a/src/routes/admin/newsletters/get.rs +++ b/src/routes/admin/newsletters/get.rs @@ -1,5 +1,6 @@ use askama_axum::Template; use axum_messages::Messages; +use uuid::Uuid; #[tracing::instrument(name = "Get newsletter form", skip(messages))] pub(in crate::routes::admin) async fn newsletter_form( @@ -17,6 +18,7 @@ pub(in crate::routes::admin) async fn newsletter_form( text_content_placeholder: "Enter newsletter text", send_newsletter_button: "Send newsletter", back_link: "Back", + idempotency_key: Uuid::new_v4().into(), flashes, } } @@ -33,5 +35,6 @@ pub(in crate::routes::admin) struct NewsletterForm<'a> { text_content_placeholder: &'a str, send_newsletter_button: &'a str, back_link: &'a str, + idempotency_key: String, flashes: Vec, } diff --git a/src/routes/admin/newsletters/post.rs b/src/routes/admin/newsletters/post.rs index ce68a70..75fdcc8 100644 --- a/src/routes/admin/newsletters/post.rs +++ b/src/routes/admin/newsletters/post.rs @@ -1,20 +1,35 @@ use crate::{ app_state::AppState, + authentication::extract::SessionUserId, domain::{SubscriberEmail, SubscriptionStatus}, - utils::{e500, HttpError}, + idempotency::{get_saved_response, save_response, IdempotencyKey}, + utils::{e422, e500, HttpError}, }; use anyhow::Context; -use axum::{extract::State, response::Redirect, Form}; +use askama_axum::IntoResponse; +use axum::{body::Body, extract::State, http::Response, response::Redirect, Form}; use axum_messages::Messages; use serde::Deserialize; use sqlx::PgPool; -#[tracing::instrument(skip(app_state, messages, form))] +#[tracing::instrument(skip(app_state, user_id, messages, form))] pub(in crate::routes::admin) async fn publish_newsletter( State(app_state): State, + SessionUserId(user_id): SessionUserId, messages: Messages, Form(form): Form, -) -> Result> { +) -> Result, HttpError> { + let flash_success = || messages.info("Newsletter sent!"); + let idempotency_key: IdempotencyKey = form.idempotency_key.try_into().map_err(e422)?; + + if let Some(saved_response) = get_saved_response(&app_state.db_pool, &idempotency_key, user_id) + .await + .map_err(e500)? + { + flash_success(); + return Ok(saved_response); + } + for subscriber in get_confirmed_subscribers(&app_state.db_pool) .await .map_err(e500)? @@ -40,9 +55,12 @@ pub(in crate::routes::admin) async fn publish_newsletter( } } - messages.info("Newsletter sent!"); + flash_success(); + + let response = Redirect::to("/admin/newsletters").into_response(); + let response = save_response(&app_state.db_pool, &idempotency_key, user_id, response).await?; - Ok(Redirect::to("/admin/newsletters")) + Ok(response) } #[tracing::instrument(skip(db_pool))] @@ -76,6 +94,7 @@ pub(in crate::routes::admin) struct FormData { title: String, html_content: String, text_content: String, + idempotency_key: String, } struct ConfirmedSubscriber { diff --git a/src/utils.rs b/src/utils.rs index 02e8c5f..7d74969 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -2,12 +2,19 @@ use axum::{ http::StatusCode, response::{IntoResponse, Redirect, Response}, }; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; pub fn redirect_to(uri: &str) -> Response { Redirect::to(uri).into_response() } +pub fn e422(error: T) -> HttpError +where + T: Debug, +{ + HttpError::UnprocessableEntity(error) +} + pub fn e500(error: T) -> HttpError where T: Debug, @@ -20,18 +27,23 @@ pub enum HttpError where T: Debug, { + #[error("Unprocessable entity")] + UnprocessableEntity(#[source] T), #[error("Something went wrong")] InternalServerError(#[from] T), } impl IntoResponse for HttpError where - T: Debug, + T: Debug + Display, { fn into_response(self) -> Response { tracing::error!("{:#?}", self); match self { + Self::UnprocessableEntity(e) => { + (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()).into_response() + } Self::InternalServerError(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), } } diff --git a/templates/web/newsletter_form.html b/templates/web/newsletter_form.html index da14327..24ca33b 100644 --- a/templates/web/newsletter_form.html +++ b/templates/web/newsletter_form.html @@ -26,6 +26,7 @@

+

<- {{ back_link }}

diff --git a/tests/api/admin_newsletters.rs b/tests/api/admin_newsletters.rs index 3f85e65..197d2bf 100644 --- a/tests/api/admin_newsletters.rs +++ b/tests/api/admin_newsletters.rs @@ -1,5 +1,6 @@ use crate::helpers::{assert_redirect_to, ConfirmationLinks, TestApp}; use serde_json::json; +use uuid::Uuid; use wiremock::{ matchers::{any, method, path}, Mock, ResponseTemplate, @@ -13,6 +14,7 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { "title": "Newsletter Title", "html_content": "

Newsletter body as html.

", "text_content": "Newsletter body as text.", + "idempotency_key": Uuid::new_v4(), }); create_confirmed_subscriber(&app).await; @@ -46,6 +48,7 @@ async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() { "title": "Newsletter Title", "html_content": "

Newsletter body as html.

", "text_content": "Newsletter body as text.", + "idempotency_key": Uuid::new_v4(), }); create_unconfirmed_subscriber(&app).await; @@ -79,6 +82,7 @@ async fn newsletters_returns_400_for_invalid_data() { json!({ "html_content": "

Newsletter body as html.

", "text_content": "Newsletter body as text.", + "idempotency_key": Uuid::new_v4(), }), "missing title", ), @@ -86,6 +90,7 @@ async fn newsletters_returns_400_for_invalid_data() { json!({ "title": "Newsletter Title", "text_content": "Newsletter body as text.", + "idempotency_key": Uuid::new_v4(), }), "missing html content", ), @@ -93,9 +98,18 @@ async fn newsletters_returns_400_for_invalid_data() { json!({ "title": "Newsletter Title", "html_content": "

Newsletter body as html.

", + "idempotency_key": Uuid::new_v4(), }), "missing text content", ), + ( + json!({ + "title": "Newsletter Title", + "html_content": "

Newsletter body as html.

", + "text_content": "Newsletter body as text.", + }), + "missing idempotency key", + ), ]; let response = app @@ -137,6 +151,43 @@ async fn requests_from_anonymous_users_are_redirected_to_login() { assert_redirect_to(&response, "/login"); } +#[tokio::test] +async fn newsletter_creation_is_idempotent() { + // given + let app = TestApp::spawn().await; + let newsletter_request_body = json!({ + "title": "Newsletter Title", + "html_content": "

Newsletter body as html.

", + "text_content": "Newsletter body as text.", + "idempotency_key": Uuid::new_v4(), + }); + + create_confirmed_subscriber(&app).await; + app.log_in(&app.test_user.username, &app.test_user.password) + .await; + + Mock::given(path("/email")) + .and(method("POST")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&app.email_server) + .await; + + let response = app.post_publish_newsletter(&newsletter_request_body).await; + assert_redirect_to(&response, "/admin/newsletters"); + + let html_page = app.get_newsletter_form_html().await; + assert!(html_page.contains("

Newsletter sent!

")); + + // when + let response = app.post_publish_newsletter(&newsletter_request_body).await; + + // then + assert_redirect_to(&response, "/admin/newsletters"); + let html_page = app.get_newsletter_form_html().await; + assert!(html_page.contains("

Newsletter sent!

")); +} + async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks { let body = "name=Imi%C4%99%20Nazwisko&email=imie.nazwisko%40example.com"; diff --git a/tests/api/helpers.rs b/tests/api/helpers.rs index dfd8a03..feb6038 100644 --- a/tests/api/helpers.rs +++ b/tests/api/helpers.rs @@ -4,6 +4,7 @@ use linkify::{LinkFinder, LinkKind}; use once_cell::sync::Lazy; use reqwest::{header::CONTENT_TYPE, redirect, Response}; use serde::Serialize; +use serde_json::json; use sqlx::{Connection, Executor, PgConnection, PgPool}; use std::{net::SocketAddr, str::FromStr}; use uuid::Uuid; @@ -181,6 +182,18 @@ impl TestApp { .expect(Self::FAILED_TO_EXECUTE_REQUEST) } + pub async fn get_newsletter_form(&self) -> Response { + self.client + .get(self.url("/admin/newsletters")) + .send() + .await + .expect(Self::FAILED_TO_EXECUTE_REQUEST) + } + + pub async fn get_newsletter_form_html(&self) -> String { + self.get_newsletter_form().await.text().await.unwrap() + } + pub async fn get_change_password_form(&self) -> Response { self.client .get(self.url("/admin/password")) @@ -213,6 +226,17 @@ impl TestApp { .expect(Self::FAILED_TO_EXECUTE_REQUEST) } + pub async fn log_in(&self, username: &str, password: &str) -> Response { + self.client + .post(self.url("/login")) + .form(&json!({ + "username": username, + "password": password, + })) + .send() + .await + .expect(Self::FAILED_TO_EXECUTE_REQUEST) + } fn url(&self, endpoint: &str) -> String { format!("http://{}{endpoint}", self.address) }