Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
Use db transaction in subscribe endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
0rzech committed Mar 10, 2024
1 parent 50842f9 commit 6788ec5
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions src/routes/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use axum::{
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use reqwest::Error;
use serde::Deserialize;
use sqlx::PgPool;
use sqlx::{Executor, Postgres, Transaction};
use std::iter::repeat_with;
use time::OffsetDateTime;
use uuid::Uuid;
Expand All @@ -38,20 +38,33 @@ async fn subscribe(State(app_state): State<AppState>, Form(form): Form<FormData>
}
};

let subscriber_id = match insert_subscriber(&new_subscriber, &app_state.db_pool).await {
let mut transaction = match app_state.db_pool.begin().await {
Ok(transaction) => transaction,
Err(e) => {
tracing::error!("Failed to begin transaction: {:?}", e);
return StatusCode::INTERNAL_SERVER_ERROR;
}
};

let subscriber_id = match insert_subscriber(&new_subscriber, &mut transaction).await {
Ok(subscriber_id) => subscriber_id,
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
};

let subscription_token = generate_subscription_token();

if store_token(subscriber_id, &subscription_token, &app_state.db_pool)
if store_token(subscriber_id, &subscription_token, &mut transaction)
.await
.is_err()
{
return StatusCode::INTERNAL_SERVER_ERROR;
}

if let Err(e) = transaction.commit().await {
tracing::error!("Failed to commit transaction: {:?}", e);
return StatusCode::INTERNAL_SERVER_ERROR;
}

if send_confirmation_email(
&app_state.email_client,
new_subscriber,
Expand All @@ -69,15 +82,14 @@ async fn subscribe(State(app_state): State<AppState>, Form(form): Form<FormData>

#[tracing::instrument(
name = "Saving new subscriber details in the database",
skip(new_subscriber, db_pool)
skip(new_subscriber, db_transaction)
)]
async fn insert_subscriber(
new_subscriber: &NewSubscriber,
db_pool: &PgPool,
db_transaction: &mut Transaction<'_, Postgres>,
) -> Result<Uuid, sqlx::Error> {
let subscriber_id = Uuid::new_v4();

sqlx::query!(
let query = sqlx::query!(
r#"
INSERT INTO subscriptions (id, email, name, subscribed_at, status)
VALUES ($1, $2, $3, $4, 'pending_confirmation')
Expand All @@ -86,10 +98,9 @@ async fn insert_subscriber(
new_subscriber.email.as_ref(),
new_subscriber.name.as_ref(),
OffsetDateTime::now_utc(),
)
.execute(db_pool)
.await
.map_err(|e| {
);

db_transaction.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {e:?}");
e
})?;
Expand Down Expand Up @@ -148,24 +159,23 @@ fn generate_subscription_token() -> String {

#[tracing::instrument(
name = "Storing subscription token in the database",
skip(subscription_token, db_pool)
skip(subscription_token, db_transaction)
)]
async fn store_token(
subscriber_id: Uuid,
subscription_token: &str,
db_pool: &PgPool,
db_transaction: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
let query = sqlx::query!(
r#"
INSERT INTO subscription_tokens (subscription_token, subscriber_id)
VALUES ($1, $2)
"#,
subscription_token,
subscriber_id
)
.execute(db_pool)
.await
.map_err(|e| {
);

db_transaction.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Expand Down

0 comments on commit 6788ec5

Please sign in to comment.