Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
Add idempotency to publish newsletter endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
0rzech committed Apr 13, 2024
1 parent 31a4934 commit bdf8e44
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 8 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions migrations/20240413145104_create_idempotency_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TYPE header_pair AS (
name TEXT,
value BYTEA
);

CREATE TABLE idempotency (
user_id uuid NOT NULL REFERENCES users(user_id),
idempotency_key TEXT NOT NULL,
response_status_code SMALLINT NOT NULL,
response_headers header_pair[] NOT NULL,
response_body BYTEA NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY(user_id, idempotency_key)
);
31 changes: 31 additions & 0 deletions src/idempotency/key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#[derive(Debug)]
pub struct IdempotencyKey(String);

impl TryFrom<String> for IdempotencyKey {
type Error = anyhow::Error;

fn try_from(s: String) -> Result<Self, Self::Error> {
if s.is_empty() {
anyhow::bail!("The idempotency key cannot be empty");
}

let max_len = 50;
if s.len() >= max_len {
anyhow::bail!("The idempotency key must be shorter than {max_len} characters")
}

Ok(Self(s))
}
}

impl From<IdempotencyKey> for String {
fn from(k: IdempotencyKey) -> Self {
k.0
}
}

impl AsRef<str> for IdempotencyKey {
fn as_ref(&self) -> &str {
&self.0
}
}
5 changes: 5 additions & 0 deletions src/idempotency/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod key;
mod persistence;

pub use key::IdempotencyKey;
pub use persistence::{get_saved_response, save_response};
100 changes: 100 additions & 0 deletions src/idempotency/persistence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::IdempotencyKey;
use axum::{
body::{to_bytes, Body},
http::{Response, StatusCode},
};
use sqlx::{postgres::PgHasArrayType, PgPool};
use uuid::Uuid;

#[tracing::instrument(skip(db_pool, idempotency_key, user_id))]
pub async fn get_saved_response(
db_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<Option<Response<Body>>, anyhow::Error> {
let saved_response = sqlx::query!(
r#"
SELECT
response_status_code,
response_headers AS "response_headers: Vec<HeaderPairRecord>",
response_body
FROM idempotency
WHERE
idempotency_key = $1 AND
user_id = $2
"#,
idempotency_key.as_ref(),
user_id
)
.fetch_optional(db_pool)
.await?;

if let Some(r) = saved_response {
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
let mut response = Response::builder().status(status_code);
for header in r.response_headers {
response = response.header(header.name, header.value);
}
Ok(Some(response.body(Body::from(r.response_body))?))
} else {
Ok(None)
}
}

#[tracing::instrument(skip(db_pool, idempotency_key, user_id, response))]
pub async fn save_response(
db_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
response: Response<Body>,
) -> Result<Response<Body>, anyhow::Error> {
let status_code = response.status().as_u16() as i16;
let headers = {
let mut h = Vec::with_capacity(response.headers().len());
for (name, value) in response.headers() {
let name = name.to_string();
let value = value.as_bytes().to_owned();
h.push(HeaderPairRecord { name, value });
}
h
};
let (parts, body) = response.into_parts();
let body = to_bytes(body, usize::MAX).await?;

sqlx::query_unchecked!(
r#"
INSERT INTO idempotency (
user_id,
idempotency_key,
response_status_code,
response_headers,
response_body,
created_at
)
VALUES ($1, $2, $3, $4, $5, now())
"#,
user_id,
idempotency_key.as_ref(),
status_code,
headers,
body.as_ref()
)
.execute(db_pool)
.await?;

let response = Response::from_parts(parts, Body::from(body));
Ok(response)
}

#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "header_pair")]
struct HeaderPairRecord {
name: String,
value: Vec<u8>,
}

impl PgHasArrayType for HeaderPairRecord {
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
sqlx::postgres::PgTypeInfo::with_name("_header_pair")
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod authentication;
pub mod configuration;
pub mod domain;
pub mod email_client;
pub mod idempotency;
pub mod request_id;
pub mod routes;
pub mod session_state;
Expand Down
3 changes: 3 additions & 0 deletions src/routes/admin/newsletters/get.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use askama_axum::Template;
use axum_messages::Messages;
use uuid::Uuid;

#[tracing::instrument(name = "Get newsletter form", skip(messages))]
pub(in crate::routes::admin) async fn newsletter_form(
Expand All @@ -17,6 +18,7 @@ pub(in crate::routes::admin) async fn newsletter_form(
text_content_placeholder: "Enter newsletter text",
send_newsletter_button: "Send newsletter",
back_link: "Back",
idempotency_key: Uuid::new_v4().into(),
flashes,
}
}
Expand All @@ -33,5 +35,6 @@ pub(in crate::routes::admin) struct NewsletterForm<'a> {
text_content_placeholder: &'a str,
send_newsletter_button: &'a str,
back_link: &'a str,
idempotency_key: String,
flashes: Vec<String>,
}
31 changes: 25 additions & 6 deletions src/routes/admin/newsletters/post.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
use crate::{
app_state::AppState,
authentication::extract::SessionUserId,
domain::{SubscriberEmail, SubscriptionStatus},
utils::{e500, HttpError},
idempotency::{get_saved_response, save_response, IdempotencyKey},
utils::{e422, e500, HttpError},
};
use anyhow::Context;
use axum::{extract::State, response::Redirect, Form};
use askama_axum::IntoResponse;
use axum::{body::Body, extract::State, http::Response, response::Redirect, Form};
use axum_messages::Messages;
use serde::Deserialize;
use sqlx::PgPool;

#[tracing::instrument(skip(app_state, messages, form))]
#[tracing::instrument(skip(app_state, user_id, messages, form))]
pub(in crate::routes::admin) async fn publish_newsletter(
State(app_state): State<AppState>,
SessionUserId(user_id): SessionUserId,
messages: Messages,
Form(form): Form<FormData>,
) -> Result<Redirect, HttpError<anyhow::Error>> {
) -> Result<Response<Body>, HttpError<anyhow::Error>> {
let flash_success = || messages.info("Newsletter sent!");
let idempotency_key: IdempotencyKey = form.idempotency_key.try_into().map_err(e422)?;

if let Some(saved_response) = get_saved_response(&app_state.db_pool, &idempotency_key, user_id)
.await
.map_err(e500)?
{
flash_success();
return Ok(saved_response);
}

for subscriber in get_confirmed_subscribers(&app_state.db_pool)
.await
.map_err(e500)?
Expand All @@ -40,9 +55,12 @@ pub(in crate::routes::admin) async fn publish_newsletter(
}
}

messages.info("Newsletter sent!");
flash_success();

let response = Redirect::to("/admin/newsletters").into_response();
let response = save_response(&app_state.db_pool, &idempotency_key, user_id, response).await?;

Ok(Redirect::to("/admin/newsletters"))
Ok(response)
}

#[tracing::instrument(skip(db_pool))]
Expand Down Expand Up @@ -76,6 +94,7 @@ pub(in crate::routes::admin) struct FormData {
title: String,
html_content: String,
text_content: String,
idempotency_key: String,
}

struct ConfirmedSubscriber {
Expand Down
16 changes: 14 additions & 2 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Redirect, Response},
};
use std::fmt::Debug;
use std::fmt::{Debug, Display};

pub fn redirect_to(uri: &str) -> Response {
Redirect::to(uri).into_response()
}

pub fn e422<T>(error: T) -> HttpError<T>
where
T: Debug,
{
HttpError::UnprocessableEntity(error)
}

pub fn e500<T>(error: T) -> HttpError<T>
where
T: Debug,
Expand All @@ -20,18 +27,23 @@ pub enum HttpError<T>
where
T: Debug,
{
#[error("Unprocessable entity")]
UnprocessableEntity(#[source] T),
#[error("Something went wrong")]
InternalServerError(#[from] T),
}

impl<T> IntoResponse for HttpError<T>
where
T: Debug,
T: Debug + Display,
{
fn into_response(self) -> Response {
tracing::error!("{:#?}", self);

match self {
Self::UnprocessableEntity(e) => {
(StatusCode::UNPROCESSABLE_ENTITY, e.to_string()).into_response()
}
Self::InternalServerError(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
Expand Down
Loading

0 comments on commit bdf8e44

Please sign in to comment.