Skip to content

Commit

Permalink
sqlx uuid encoding and decoding for sqlite and postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
pxp9 committed Apr 16, 2024
1 parent baef61e commit 527868b
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 90 deletions.
4 changes: 2 additions & 2 deletions fang/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ migrations = ["dep:diesel_migrations"]
[dev-dependencies]
fang-derive-error = { version = "0.1.0"}
diesel_migrations = { version = "2.1" , features = ["postgres", "sqlite" , "mysql"]}
sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "runtime-tokio-rustls", "postgres", "sqlite", "mysql"]}
sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "uuid", "runtime-tokio-rustls", "postgres", "sqlite", "mysql"]}
#console-subscriber = "0.2.0" # for tokio tracing debug

[dependencies]
Expand All @@ -52,7 +52,7 @@ typed-builder = "0.14"
typetag = "0.2"
uuid = { version = "1.1", features = ["v4"] }
fang-derive-error = { version = "0.1.0" , optional = true}
sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "runtime-tokio-rustls"], optional = true}
sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "uuid", "runtime-tokio-rustls"], optional = true}

[dependencies.diesel]
version = "2.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
-- Your SQL goes here


-- docker exec -ti mysql mysql -u root -pfang -P 3360 fang -e "$(catn fang/mysql_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql)"

CREATE TABLE fang_tasks (
id TEXT CHECK (LENGTH(id) = 36) NOT NULL PRIMARY KEY, -- UUID generated inside the language
-- why uuid is a text ? https://stackoverflow.com/questions/17277735/using-uuids-in-sqlite
-- uuid will be stored as a 16 byte BLOB
id BLOB NOT NULL PRIMARY KEY, -- UUID generated inside the language
metadata TEXT NOT NULL,
-- why metadata is text ? https://stackoverflow.com/questions/16603621/how-to-store-json-object-in-sqlite-database#16603687
error_message TEXT,
Expand Down
52 changes: 12 additions & 40 deletions fang/src/asynk/backend_sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ where
for<'r> &'r str: Encode<'r, DB> + Type<DB>,
for<'r> i32: Encode<'r, DB> + Type<DB>,
for<'r> i64: Encode<'r, DB> + Type<DB>,
for<'r> &'r Uuid: Encode<'r, DB> + Type<DB>,
for<'r> &'r Pool<DB>: Executor<'r, Database = DB>,
for<'r> <DB as HasArguments<'r>>::Arguments: IntoArguments<'r, DB>,
<DB as Database>::QueryResult: Into<AnyQueryResult>,
Expand Down Expand Up @@ -204,17 +205,9 @@ where
pool: &Pool<DB>,
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
let mut buffer = Uuid::encode_buffer();
let uuid_as_text = params
.uuid
.unwrap()
.as_hyphenated()
.encode_lower(&mut buffer);
let uuid = params.uuid.unwrap();

let task: Task = sqlx::query_as(query)
.bind(&*uuid_as_text)
.fetch_one(pool)
.await?;
let task: Task = sqlx::query_as(query).bind(uuid).fetch_one(pool).await?;

Ok(task)
}
Expand All @@ -234,13 +227,7 @@ where
let now = now_i64;
let retries = params.task.unwrap().retries + 1;

let mut buffer = Uuid::encode_buffer();
let uuid_as_text = params
.task
.unwrap()
.id
.as_hyphenated()
.encode_lower(&mut buffer);
let uuid = params.uuid.unwrap();

let error = params.error_message.unwrap();

Expand All @@ -249,7 +236,7 @@ where
.bind(retries)
.bind(scheduled_at)
.bind(now)
.bind(&*uuid_as_text)
.bind(uuid)
.fetch_one(pool)
.await?;

Expand All @@ -262,8 +249,6 @@ where
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
let uuid = Uuid::new_v4();
let mut buffer = Uuid::encode_buffer();
let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer);

let metadata = params.metadata.unwrap();

Expand All @@ -275,7 +260,7 @@ where
let uniq_hash = calculate_hash(&metadata_str);

let task: Task = sqlx::query_as(query)
.bind(uuid_as_str)
.bind(&uuid)
.bind(metadata_str)
.bind(task_type)
.bind(uniq_hash)
Expand All @@ -291,16 +276,14 @@ where
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
let uuid = Uuid::new_v4();
let mut buffer = Uuid::encode_buffer();
let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer);

let scheduled_at_i64 = params.scheduled_at.unwrap().timestamp();

let metadata_str = params.metadata.unwrap().to_string();
let task_type = params.task_type.unwrap();

let task: Task = sqlx::query_as(query)
.bind(uuid_as_str)
.bind(&uuid)
.bind(metadata_str)
.bind(task_type)
.bind(scheduled_at_i64)
Expand All @@ -321,13 +304,10 @@ where

let uuid = params.uuid.unwrap();

let mut buffer = Uuid::encode_buffer();
let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer);

let task: Task = sqlx::query_as(query)
.bind(state_str)
.bind(updated_at)
.bind(&*uuid_as_text)
.bind(uuid)
.fetch_one(pool)
.await?;

Expand All @@ -341,18 +321,15 @@ where
) -> Result<Task, AsyncQueueError> {
let updated_at = Utc::now().timestamp();

let id = params.task.unwrap().id;

let mut buffer = Uuid::encode_buffer();
let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer);
let uuid = params.task.unwrap().id;

let error_message = params.error_message.unwrap();

let failed_task: Task = sqlx::query_as(query)
.bind(<&str>::from(FangTaskState::Failed))
.bind(error_message)
.bind(updated_at)
.bind(&*uuid_as_text)
.bind(&uuid)
.fetch_one(pool)
.await?;

Expand Down Expand Up @@ -391,15 +368,10 @@ where
pool: &Pool<DB>,
params: QueryParams<'_>,
) -> Result<u64, AsyncQueueError> {
let mut buffer = Uuid::encode_buffer();
let uuid_as_text = params
.uuid
.unwrap()
.as_hyphenated()
.encode_lower(&mut buffer);
let uuid = params.uuid.unwrap();

let result = sqlx::query(query)
.bind(&*uuid_as_text)
.bind(uuid)
.execute(pool)
.await?
.into()
Expand Down
25 changes: 5 additions & 20 deletions fang/src/asynk/backend_sqlx/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use crate::Task;

impl<'a> FromRow<'a, PgRow> for Task {
fn from_row(row: &'a PgRow) -> Result<Self, sqlx::Error> {
let uuid_as_text: &str = row.get("id");
let id: Uuid = row.get("id");

let id = Uuid::parse_str(uuid_as_text).unwrap();
//let id = Uuid::parse_str(uuid_as_text).unwrap();

let raw: &str = row.get("metadata"); // will work if database cast json to string
let raw = raw.replace('\\', "");
Expand All @@ -66,26 +66,11 @@ impl<'a> FromRow<'a, PgRow> for Task {

let retries: i32 = row.get("retries");

let scheduled_at_str: &str = row.get("scheduled_at");
let scheduled_at: DateTime<Utc> = row.get("scheduled_at");

// This unwrap is safe because we know that the database returns the date in the correct format
let scheduled_at: DateTime<Utc> = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z")
.unwrap()
.into();
let created_at: DateTime<Utc> = row.get("created_at");

let created_at_str: &str = row.get("created_at");

// This unwrap is safe because we know that the database returns the date in the correct format
let created_at: DateTime<Utc> = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z")
.unwrap()
.into();

let updated_at_str: &str = row.get("updated_at");

// This unwrap is safe because we know that the database returns the date in the correct format
let updated_at: DateTime<Utc> = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z")
.unwrap()
.into();
let updated_at: DateTime<Utc> = row.get("updated_at");

Ok(Task::builder()
.id(id)
Expand Down
19 changes: 5 additions & 14 deletions fang/src/asynk/backend_sqlx/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use SqlXQuery as Q;

impl<'a> FromRow<'a, SqliteRow> for Task {
fn from_row(row: &'a SqliteRow) -> Result<Self, sqlx::Error> {
let uuid_as_text: &str = row.get("id");
let id: Uuid = row.get("id");

let id = Uuid::parse_str(uuid_as_text).unwrap();
//let id = Uuid::parse_str(uuid_as_text).unwrap();

let raw: &str = row.get("metadata"); // will work if database cast json to string
let raw = raw.replace('\\', "");
Expand All @@ -60,20 +60,11 @@ impl<'a> FromRow<'a, SqliteRow> for Task {

let retries: i32 = row.get("retries");

let scheduled_at: i64 = row.get("scheduled_at");
let scheduled_at: DateTime<Utc> = row.get("scheduled_at");

// This unwrap is safe because we know that the database returns the date in the correct format
let scheduled_at: DateTime<Utc> = DateTime::from_timestamp(scheduled_at, 0).unwrap();
let created_at: DateTime<Utc> = row.get("created_at");

let created_at: i64 = row.get("created_at");

// This unwrap is safe because we know that the database returns the date in the correct format
let created_at: DateTime<Utc> = DateTime::from_timestamp(created_at, 0).unwrap();

let updated_at: i64 = row.get("updated_at");

// This unwrap is safe because we know that the database returns the date in the correct format
let updated_at: DateTime<Utc> = DateTime::from_timestamp(updated_at, 0).unwrap();
let updated_at: DateTime<Utc> = row.get("updated_at");

Ok(Task::builder()
.id(id)
Expand Down
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/fail_task.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = to_timestamp($3) WHERE id = $4::uuid RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = to_timestamp($3) WHERE id = $4 RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/fetch_task_type.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND to_timestamp($2) >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND to_timestamp($2) >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/find_task_by_id.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text FROM fang_tasks WHERE id = $1::uuid
SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1::uuid
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/find_task_by_uniq_hash.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1
SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/insert_task.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1::uuid, $2::jsonb, $3, to_timestamp($4) ) RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text
INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1, $2::jsonb, $3, to_timestamp($4) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/insert_task_uniq.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1::uuid, $2::jsonb , $3, $4, to_timestamp($5) ) RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text
INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2::jsonb , $3, $4, to_timestamp($5) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/retry_task.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = to_timestamp($3), "updated_at" = to_timestamp($4) WHERE id = $5::uuid RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text
UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = to_timestamp($3), "updated_at" = to_timestamp($4) WHERE id = $5::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/update_task_state.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = to_timestamp($2) WHERE id = $3::uuid RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = to_timestamp($2) WHERE id = $3::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/blocking/sqlite_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

diesel::table! {
fang_tasks (id) {
id -> Text,
id -> Binary,
metadata -> Text,
error_message -> Nullable<Text>,
state -> Text,
Expand Down

0 comments on commit 527868b

Please sign in to comment.