Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
Handle concurrent publish newsletter form submission gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
0rzech committed Apr 14, 2024
1 parent bdf8e44 commit 1704f86
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 42 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL;
2 changes: 1 addition & 1 deletion src/idempotency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ mod key;
mod persistence;

pub use key::IdempotencyKey;
pub use persistence::{get_saved_response, save_response};
pub use persistence::{get_saved_response, save_response, try_processing, NextAction};
87 changes: 62 additions & 25 deletions src/idempotency/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,48 @@
use super::IdempotencyKey;
use anyhow::anyhow;
use axum::{
body::{to_bytes, Body},
http::{Response, StatusCode},
};
use sqlx::{postgres::PgHasArrayType, PgPool};
use sqlx::{postgres::PgHasArrayType, Executor, PgPool, Postgres, Transaction};
use uuid::Uuid;

pub enum NextAction {
StartProcessing(Transaction<'static, Postgres>),
ReturnSavedResponse(Response<Body>),
}

#[tracing::instrument(skip(db_pool, idempotency_key, user_id))]
pub async fn try_processing(
db_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<NextAction, anyhow::Error> {
let mut transaction = db_pool.begin().await?;
let query = sqlx::query!(
r#"
INSERT INTO idempotency (
user_id,
idempotency_key,
created_at
)
VALUES ($1, $2, now())
ON CONFLICT DO NOTHING
"#,
user_id,
idempotency_key.as_ref()
);

if transaction.execute(query).await?.rows_affected() > 0 {
Ok(NextAction::StartProcessing(transaction))
} else {
let response = get_saved_response(db_pool, idempotency_key, user_id)
.await?
.ok_or_else(|| anyhow!("Expected saved response was not found"))?;
Ok(NextAction::ReturnSavedResponse(response))
}
}

#[tracing::instrument(skip(db_pool, idempotency_key, user_id))]
pub async fn get_saved_response(
db_pool: &PgPool,
Expand All @@ -15,9 +52,9 @@ pub async fn get_saved_response(
let saved_response = sqlx::query!(
r#"
SELECT
response_status_code,
response_headers AS "response_headers: Vec<HeaderPairRecord>",
response_body
response_status_code AS "response_status_code!",
response_headers AS "response_headers!: Vec<HeaderPairRecord>",
response_body AS "response_body!"
FROM idempotency
WHERE
idempotency_key = $1 AND
Expand All @@ -41,9 +78,9 @@ pub async fn get_saved_response(
}
}

#[tracing::instrument(skip(db_pool, idempotency_key, user_id, response))]
#[tracing::instrument(skip(transaction, idempotency_key, user_id, response))]
pub async fn save_response(
db_pool: &PgPool,
mut transaction: Transaction<'static, Postgres>,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
response: Response<Body>,
Expand All @@ -61,26 +98,26 @@ pub async fn save_response(
let (parts, body) = response.into_parts();
let body = to_bytes(body, usize::MAX).await?;

sqlx::query_unchecked!(
r#"
INSERT INTO idempotency (
transaction
.execute(sqlx::query_unchecked!(
r#"
UPDATE idempotency
SET
response_status_code = $3,
response_headers = $4,
response_body = $5
WHERE
user_id = $1 AND
idempotency_key = $2
"#,
user_id,
idempotency_key,
response_status_code,
response_headers,
response_body,
created_at
)
VALUES ($1, $2, $3, $4, $5, now())
"#,
user_id,
idempotency_key.as_ref(),
status_code,
headers,
body.as_ref()
)
.execute(db_pool)
.await?;
idempotency_key.as_ref(),
status_code,
headers,
body.as_ref()
))
.await?;
transaction.commit().await?;

let response = Response::from_parts(parts, Body::from(body));
Ok(response)
Expand Down
15 changes: 9 additions & 6 deletions src/routes/admin/newsletters/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
app_state::AppState,
authentication::extract::SessionUserId,
domain::{SubscriberEmail, SubscriptionStatus},
idempotency::{get_saved_response, save_response, IdempotencyKey},
idempotency::{save_response, try_processing, IdempotencyKey, NextAction},
utils::{e422, e500, HttpError},
};
use anyhow::Context;
Expand All @@ -22,13 +22,16 @@ pub(in crate::routes::admin) async fn publish_newsletter(
let flash_success = || messages.info("Newsletter sent!");
let idempotency_key: IdempotencyKey = form.idempotency_key.try_into().map_err(e422)?;

if let Some(saved_response) = get_saved_response(&app_state.db_pool, &idempotency_key, user_id)
let transaction = match try_processing(&app_state.db_pool, &idempotency_key, user_id)
.await
.map_err(e500)?
{
flash_success();
return Ok(saved_response);
}
NextAction::StartProcessing(transaction) => transaction,
NextAction::ReturnSavedResponse(saved_response) => {
flash_success();
return Ok(saved_response);
}
};

for subscriber in get_confirmed_subscribers(&app_state.db_pool)
.await
Expand Down Expand Up @@ -58,7 +61,7 @@ pub(in crate::routes::admin) async fn publish_newsletter(
flash_success();

let response = Redirect::to("/admin/newsletters").into_response();
let response = save_response(&app_state.db_pool, &idempotency_key, user_id, response).await?;
let response = save_response(transaction, &idempotency_key, user_id, response).await?;

Ok(response)
}
Expand Down
37 changes: 37 additions & 0 deletions tests/api/admin_newsletters.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::helpers::{assert_redirect_to, ConfirmationLinks, TestApp};
use serde_json::json;
use std::time::Duration;
use uuid::Uuid;
use wiremock::{
matchers::{any, method, path},
Expand Down Expand Up @@ -188,6 +189,42 @@ async fn newsletter_creation_is_idempotent() {
assert!(html_page.contains("<p><i>Newsletter sent!</i></p>"));
}

#[tokio::test]
async fn concurrent_form_submission_is_handled_gracefully() {
// given
let app = TestApp::spawn().await;
let newsletter_request_body = json!({
"title": "Newsletter Title",
"html_content": "<p>Newsletter body as html.</p>",
"text_content": "Newsletter body as text.",
"idempotency_key": Uuid::new_v4(),
});

create_confirmed_subscriber(&app).await;
app.log_in(&app.test_user.username, &app.test_user.password)
.await;

Mock::given(path("/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
.expect(1)
.mount(&app.email_server)
.await;

// when
let response1 = app.post_publish_newsletter(&newsletter_request_body);
let response2 = app.post_publish_newsletter(&newsletter_request_body);
let (response1, response2) = tokio::join!(response1, response2);

// then
assert_redirect_to(&response1, "/admin/newsletters");
assert_eq!(response1.status(), response2.status());
assert_eq!(
response1.text().await.unwrap(),
response2.text().await.unwrap()
);
}

async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks {
let body = "name=Imi%C4%99%20Nazwisko&email=imie.nazwisko%40example.com";

Expand Down

0 comments on commit 1704f86

Please sign in to comment.