Skip to content

Commit

Permalink
feat: 第七章代码(四)增加数据库事务
Browse files Browse the repository at this point in the history
  • Loading branch information
fan-tastic-z committed Nov 2, 2024
1 parent e8a7a55 commit 69725c1
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions src/routes/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::{
use chrono::Utc;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use serde::Deserialize;
use sqlx::PgPool;
use sqlx::{Executor, Postgres, Transaction};
use uuid::Uuid;

use crate::{
Expand All @@ -31,16 +31,24 @@ pub async fn subscribe(
Ok(subscriber) => subscriber,
Err(_) => return Err(StatusCode::BAD_REQUEST),
};
let subscriber_id = match insert_subscriber(&state.db_pool, &new_subscriber).await {
let mut transaction = match state.db_pool.begin().await {
Ok(transaction) => transaction,
Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR),
};
let subscriber_id = match insert_subscriber(&mut transaction, &new_subscriber).await {
Ok(subscriber_id) => subscriber_id,
Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR),
};
let subscription_token = generate_subscription_token();
if store_token(&state.db_pool, subscriber_id, &subscription_token)
if store_token(&mut transaction, subscriber_id, &subscription_token)
.await
.is_err()
{
return Err(StatusCode::INTERNAL_SERVER_ERROR);
};

if transaction.commit().await.is_err() {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}

if send_confirm_email(
Expand Down Expand Up @@ -83,11 +91,11 @@ pub async fn send_confirm_email(
}

pub async fn insert_subscriber(
pool: &PgPool,
transaction: &mut Transaction<'_, Postgres>,
new_subscriber: &NewSubscriber,
) -> Result<Uuid, sqlx::Error> {
let subscriber_id = Uuid::new_v4();
sqlx::query(
let query = sqlx::query(
r#"
INSERT INTO subscriptions (id, email, name, subscribed_at, status)
VALUES ($1, $2, $3, $4, 'pending_confirmation')
Expand All @@ -96,10 +104,8 @@ pub async fn insert_subscriber(
.bind(subscriber_id)
.bind(new_subscriber.email.as_ref())
.bind(new_subscriber.name.as_ref())
.bind(Utc::now())
.execute(pool)
.await
.map_err(|e| {
.bind(Utc::now());
transaction.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Expand All @@ -125,21 +131,19 @@ fn generate_subscription_token() -> String {
}

pub async fn store_token(
pool: &PgPool,
transaction: &mut Transaction<'_, Postgres>,
subscriber_id: Uuid,
subscription_token: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
let query = sqlx::query(
r#"
INSERT INTO subscription_tokens (subscription_token,subscriber_id)
VALUES ($1, $2)
"#,
)
.bind(subscription_token)
.bind(subscriber_id)
.execute(pool)
.await
.map_err(|e| {
.bind(subscriber_id);
transaction.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Expand Down

0 comments on commit 69725c1

Please sign in to comment.