From 1704f8644ff82afb919bc29291f996bac572b62f Mon Sep 17 00:00:00 2001 From: Piotr Orzechowski Date: Sun, 14 Apr 2024 07:59:18 +0200 Subject: [PATCH] Handle concurrent publish newsletter form submission gracefully --- ...963b7b4e6961d53e99a520068d46836f1027.json} | 16 ++-- ...697326211b11923c403618bdf15d98aae00e.json} | 4 +- ...617860abd3d7a806a900aa6d608c993dabb0b.json | 15 ++++ ...51122_relax_null_checks_on_idempotency.sql | 3 + src/idempotency/mod.rs | 2 +- src/idempotency/persistence.rs | 87 +++++++++++++------ src/routes/admin/newsletters/post.rs | 15 ++-- tests/api/admin_newsletters.rs | 37 ++++++++ 8 files changed, 137 insertions(+), 42 deletions(-) rename .sqlx/{query-a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489.json => query-541a14fb26ec3a69040c24de8970963b7b4e6961d53e99a520068d46836f1027.json} (61%) rename .sqlx/{query-429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd.json => query-7dcf9727b8aa0fe38588e0366ff9697326211b11923c403618bdf15d98aae00e.json} (68%) create mode 100644 .sqlx/query-f835e8ebdcd687acf7fcf845127617860abd3d7a806a900aa6d608c993dabb0b.json create mode 100644 migrations/20240414051122_relax_null_checks_on_idempotency.sql diff --git a/.sqlx/query-a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489.json b/.sqlx/query-541a14fb26ec3a69040c24de8970963b7b4e6961d53e99a520068d46836f1027.json similarity index 61% rename from .sqlx/query-a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489.json rename to .sqlx/query-541a14fb26ec3a69040c24de8970963b7b4e6961d53e99a520068d46836f1027.json index 8e8ca05..1d84b88 100644 --- a/.sqlx/query-a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489.json +++ b/.sqlx/query-541a14fb26ec3a69040c24de8970963b7b4e6961d53e99a520068d46836f1027.json @@ -1,16 +1,16 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n response_status_code,\n response_headers AS \"response_headers: Vec\",\n response_body\n FROM idempotency\n WHERE\n idempotency_key = $1 AND\n user_id = $2\n ", + "query": "\n SELECT\n response_status_code AS \"response_status_code!\",\n response_headers AS \"response_headers!: Vec\",\n response_body AS \"response_body!\"\n FROM idempotency\n WHERE\n idempotency_key = $1 AND\n user_id = $2\n ", "describe": { "columns": [ { "ordinal": 0, - "name": "response_status_code", + "name": "response_status_code!", "type_info": "Int2" }, { "ordinal": 1, - "name": "response_headers: Vec", + "name": "response_headers!: Vec", "type_info": { "Custom": { "name": "_header_pair", @@ -38,7 +38,7 @@ }, { "ordinal": 2, - "name": "response_body", + "name": "response_body!", "type_info": "Bytea" } ], @@ -49,10 +49,10 @@ ] }, "nullable": [ - false, - false, - false + true, + true, + true ] }, - "hash": "a00b9ba5641078266d469ca4374e636dab1e6741441501f384a1a94f53c70489" + "hash": "541a14fb26ec3a69040c24de8970963b7b4e6961d53e99a520068d46836f1027" } diff --git a/.sqlx/query-429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd.json b/.sqlx/query-7dcf9727b8aa0fe38588e0366ff9697326211b11923c403618bdf15d98aae00e.json similarity index 68% rename from .sqlx/query-429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd.json rename to .sqlx/query-7dcf9727b8aa0fe38588e0366ff9697326211b11923c403618bdf15d98aae00e.json index ef59776..4d2e0e9 100644 --- a/.sqlx/query-429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd.json +++ b/.sqlx/query-7dcf9727b8aa0fe38588e0366ff9697326211b11923c403618bdf15d98aae00e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO idempotency (\n user_id,\n idempotency_key,\n response_status_code,\n response_headers,\n response_body,\n created_at\n )\n VALUES ($1, $2, $3, $4, $5, now())\n ", + "query": "\n UPDATE idempotency\n SET\n response_status_code = $3,\n response_headers = $4,\n response_body = $5\n WHERE\n user_id = $1 AND\n idempotency_key = $2\n ", "describe": { "columns": [], "parameters": { @@ -37,5 +37,5 @@ }, "nullable": [] }, - "hash": "429f0897a3c43dca32af247947c1ddd6d6ddb3240e39400ebd68f60cc6f07bbd" + "hash": "7dcf9727b8aa0fe38588e0366ff9697326211b11923c403618bdf15d98aae00e" } diff --git a/.sqlx/query-f835e8ebdcd687acf7fcf845127617860abd3d7a806a900aa6d608c993dabb0b.json b/.sqlx/query-f835e8ebdcd687acf7fcf845127617860abd3d7a806a900aa6d608c993dabb0b.json new file mode 100644 index 0000000..c22f1b8 --- /dev/null +++ b/.sqlx/query-f835e8ebdcd687acf7fcf845127617860abd3d7a806a900aa6d608c993dabb0b.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO idempotency (\n user_id,\n idempotency_key,\n created_at\n )\n VALUES ($1, $2, now())\n ON CONFLICT DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "f835e8ebdcd687acf7fcf845127617860abd3d7a806a900aa6d608c993dabb0b" +} diff --git a/migrations/20240414051122_relax_null_checks_on_idempotency.sql b/migrations/20240414051122_relax_null_checks_on_idempotency.sql new file mode 100644 index 0000000..d3c523b --- /dev/null +++ b/migrations/20240414051122_relax_null_checks_on_idempotency.sql @@ -0,0 +1,3 @@ +ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL; +ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL; +ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL; \ No newline at end of file diff --git a/src/idempotency/mod.rs b/src/idempotency/mod.rs index bda225f..b4a7b2a 100644 --- a/src/idempotency/mod.rs +++ b/src/idempotency/mod.rs @@ -2,4 +2,4 @@ mod key; mod persistence; pub use key::IdempotencyKey; -pub use persistence::{get_saved_response, save_response}; +pub use persistence::{get_saved_response, save_response, try_processing, NextAction}; diff --git a/src/idempotency/persistence.rs b/src/idempotency/persistence.rs index 4ee072b..46c0a13 100644 --- a/src/idempotency/persistence.rs +++ b/src/idempotency/persistence.rs @@ -1,11 +1,48 @@ use super::IdempotencyKey; +use anyhow::anyhow; use axum::{ body::{to_bytes, Body}, http::{Response, StatusCode}, }; -use sqlx::{postgres::PgHasArrayType, PgPool}; +use sqlx::{postgres::PgHasArrayType, Executor, PgPool, Postgres, Transaction}; use uuid::Uuid; +pub enum NextAction { + StartProcessing(Transaction<'static, Postgres>), + ReturnSavedResponse(Response), +} + +#[tracing::instrument(skip(db_pool, idempotency_key, user_id))] +pub async fn try_processing( + db_pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, +) -> Result { + let mut transaction = db_pool.begin().await?; + let query = sqlx::query!( + r#" + INSERT INTO idempotency ( + user_id, + idempotency_key, + created_at + ) + VALUES ($1, $2, now()) + ON CONFLICT DO NOTHING + "#, + user_id, + idempotency_key.as_ref() + ); + + if transaction.execute(query).await?.rows_affected() > 0 { + Ok(NextAction::StartProcessing(transaction)) + } else { + let response = get_saved_response(db_pool, idempotency_key, user_id) + .await? + .ok_or_else(|| anyhow!("Expected saved response was not found"))?; + Ok(NextAction::ReturnSavedResponse(response)) + } +} + #[tracing::instrument(skip(db_pool, idempotency_key, user_id))] pub async fn get_saved_response( db_pool: &PgPool, @@ -15,9 +52,9 @@ pub async fn get_saved_response( let saved_response = sqlx::query!( r#" SELECT - response_status_code, - response_headers AS "response_headers: Vec", - response_body + response_status_code AS "response_status_code!", + response_headers AS "response_headers!: Vec", + response_body AS "response_body!" FROM idempotency WHERE idempotency_key = $1 AND @@ -41,9 +78,9 @@ pub async fn get_saved_response( } } -#[tracing::instrument(skip(db_pool, idempotency_key, user_id, response))] +#[tracing::instrument(skip(transaction, idempotency_key, user_id, response))] pub async fn save_response( - db_pool: &PgPool, + mut transaction: Transaction<'static, Postgres>, idempotency_key: &IdempotencyKey, user_id: Uuid, response: Response, @@ -61,26 +98,26 @@ pub async fn save_response( let (parts, body) = response.into_parts(); let body = to_bytes(body, usize::MAX).await?; - sqlx::query_unchecked!( - r#" - INSERT INTO idempotency ( + transaction + .execute(sqlx::query_unchecked!( + r#" + UPDATE idempotency + SET + response_status_code = $3, + response_headers = $4, + response_body = $5 + WHERE + user_id = $1 AND + idempotency_key = $2 + "#, user_id, - idempotency_key, - response_status_code, - response_headers, - response_body, - created_at - ) - VALUES ($1, $2, $3, $4, $5, now()) - "#, - user_id, - idempotency_key.as_ref(), - status_code, - headers, - body.as_ref() - ) - .execute(db_pool) - .await?; + idempotency_key.as_ref(), + status_code, + headers, + body.as_ref() + )) + .await?; + transaction.commit().await?; let response = Response::from_parts(parts, Body::from(body)); Ok(response) diff --git a/src/routes/admin/newsletters/post.rs b/src/routes/admin/newsletters/post.rs index 75fdcc8..d8cd9b5 100644 --- a/src/routes/admin/newsletters/post.rs +++ b/src/routes/admin/newsletters/post.rs @@ -2,7 +2,7 @@ use crate::{ app_state::AppState, authentication::extract::SessionUserId, domain::{SubscriberEmail, SubscriptionStatus}, - idempotency::{get_saved_response, save_response, IdempotencyKey}, + idempotency::{save_response, try_processing, IdempotencyKey, NextAction}, utils::{e422, e500, HttpError}, }; use anyhow::Context; @@ -22,13 +22,16 @@ pub(in crate::routes::admin) async fn publish_newsletter( let flash_success = || messages.info("Newsletter sent!"); let idempotency_key: IdempotencyKey = form.idempotency_key.try_into().map_err(e422)?; - if let Some(saved_response) = get_saved_response(&app_state.db_pool, &idempotency_key, user_id) + let transaction = match try_processing(&app_state.db_pool, &idempotency_key, user_id) .await .map_err(e500)? { - flash_success(); - return Ok(saved_response); - } + NextAction::StartProcessing(transaction) => transaction, + NextAction::ReturnSavedResponse(saved_response) => { + flash_success(); + return Ok(saved_response); + } + }; for subscriber in get_confirmed_subscribers(&app_state.db_pool) .await @@ -58,7 +61,7 @@ pub(in crate::routes::admin) async fn publish_newsletter( flash_success(); let response = Redirect::to("/admin/newsletters").into_response(); - let response = save_response(&app_state.db_pool, &idempotency_key, user_id, response).await?; + let response = save_response(transaction, &idempotency_key, user_id, response).await?; Ok(response) } diff --git a/tests/api/admin_newsletters.rs b/tests/api/admin_newsletters.rs index 197d2bf..bb361aa 100644 --- a/tests/api/admin_newsletters.rs +++ b/tests/api/admin_newsletters.rs @@ -1,5 +1,6 @@ use crate::helpers::{assert_redirect_to, ConfirmationLinks, TestApp}; use serde_json::json; +use std::time::Duration; use uuid::Uuid; use wiremock::{ matchers::{any, method, path}, @@ -188,6 +189,42 @@ async fn newsletter_creation_is_idempotent() { assert!(html_page.contains("

Newsletter sent!

")); } +#[tokio::test] +async fn concurrent_form_submission_is_handled_gracefully() { + // given + let app = TestApp::spawn().await; + let newsletter_request_body = json!({ + "title": "Newsletter Title", + "html_content": "

Newsletter body as html.

", + "text_content": "Newsletter body as text.", + "idempotency_key": Uuid::new_v4(), + }); + + create_confirmed_subscriber(&app).await; + app.log_in(&app.test_user.username, &app.test_user.password) + .await; + + Mock::given(path("/email")) + .and(method("POST")) + .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2))) + .expect(1) + .mount(&app.email_server) + .await; + + // when + let response1 = app.post_publish_newsletter(&newsletter_request_body); + let response2 = app.post_publish_newsletter(&newsletter_request_body); + let (response1, response2) = tokio::join!(response1, response2); + + // then + assert_redirect_to(&response1, "/admin/newsletters"); + assert_eq!(response1.status(), response2.status()); + assert_eq!( + response1.text().await.unwrap(), + response2.text().await.unwrap() + ); +} + async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks { let body = "name=Imi%C4%99%20Nazwisko&email=imie.nazwisko%40example.com";