From 527868bbd763e3692f66755cf7e6ecf2ea58a071 Mon Sep 17 00:00:00 2001 From: pxp9 <48651252+pxp9@users.noreply.github.com> Date: Tue, 16 Apr 2024 10:54:47 +0200 Subject: [PATCH] sqlx uuid encoding and decoding for sqlite and postgres --- fang/Cargo.toml | 4 +- .../up.sql | 7 +-- fang/src/asynk/backend_sqlx.rs | 52 +++++-------------- fang/src/asynk/backend_sqlx/postgres.rs | 25 ++------- fang/src/asynk/backend_sqlx/sqlite.rs | 19 ++----- fang/src/asynk/queries_postgres/fail_task.sql | 2 +- .../queries_postgres/fetch_task_type.sql | 2 +- .../queries_postgres/find_task_by_id.sql | 2 +- .../find_task_by_uniq_hash.sql | 2 +- .../asynk/queries_postgres/insert_task.sql | 2 +- .../queries_postgres/insert_task_uniq.sql | 2 +- .../src/asynk/queries_postgres/retry_task.sql | 2 +- .../queries_postgres/update_task_state.sql | 2 +- fang/src/blocking/sqlite_schema.rs | 2 +- 14 files changed, 35 insertions(+), 90 deletions(-) diff --git a/fang/Cargo.toml b/fang/Cargo.toml index a38cd19d..2fe1f2a3 100644 --- a/fang/Cargo.toml +++ b/fang/Cargo.toml @@ -35,7 +35,7 @@ migrations = ["dep:diesel_migrations"] [dev-dependencies] fang-derive-error = { version = "0.1.0"} diesel_migrations = { version = "2.1" , features = ["postgres", "sqlite" , "mysql"]} -sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "runtime-tokio-rustls", "postgres", "sqlite", "mysql"]} +sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "uuid", "runtime-tokio-rustls", "postgres", "sqlite", "mysql"]} #console-subscriber = "0.2.0" # for tokio tracing debug [dependencies] @@ -52,7 +52,7 @@ typed-builder = "0.14" typetag = "0.2" uuid = { version = "1.1", features = ["v4"] } fang-derive-error = { version = "0.1.0" , optional = true} -sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "runtime-tokio-rustls"], optional = true} +sqlx = {version = "0.6.3", features = ["any" , "macros" , "chrono", "uuid", "runtime-tokio-rustls"], optional = true} [dependencies.diesel] version = "2.1" diff --git a/fang/sqlite_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql b/fang/sqlite_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql index 4c88cc79..4c9d6906 100644 --- a/fang/sqlite_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql +++ b/fang/sqlite_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql @@ -1,11 +1,8 @@ -- Your SQL goes here - --- docker exec -ti mysql mysql -u root -pfang -P 3360 fang -e "$(catn fang/mysql_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql)" - CREATE TABLE fang_tasks ( - id TEXT CHECK (LENGTH(id) = 36) NOT NULL PRIMARY KEY, -- UUID generated inside the language - -- why uuid is a text ? https://stackoverflow.com/questions/17277735/using-uuids-in-sqlite + -- uuid will be stored as a 16 byte BLOB + id BLOB NOT NULL PRIMARY KEY, -- UUID generated inside the language metadata TEXT NOT NULL, -- why metadata is text ? https://stackoverflow.com/questions/16603621/how-to-store-json-object-in-sqlite-database#16603687 error_message TEXT, diff --git a/fang/src/asynk/backend_sqlx.rs b/fang/src/asynk/backend_sqlx.rs index 43e9cfff..fcbbe3d3 100644 --- a/fang/src/asynk/backend_sqlx.rs +++ b/fang/src/asynk/backend_sqlx.rs @@ -159,6 +159,7 @@ where for<'r> &'r str: Encode<'r, DB> + Type, for<'r> i32: Encode<'r, DB> + Type, for<'r> i64: Encode<'r, DB> + Type, + for<'r> &'r Uuid: Encode<'r, DB> + Type, for<'r> &'r Pool: Executor<'r, Database = DB>, for<'r> >::Arguments: IntoArguments<'r, DB>, ::QueryResult: Into, @@ -204,17 +205,9 @@ where pool: &Pool, params: QueryParams<'_>, ) -> Result { - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .uuid - .unwrap() - .as_hyphenated() - .encode_lower(&mut buffer); + let uuid = params.uuid.unwrap(); - let task: Task = sqlx::query_as(query) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; + let task: Task = sqlx::query_as(query).bind(uuid).fetch_one(pool).await?; Ok(task) } @@ -234,13 +227,7 @@ where let now = now_i64; let retries = params.task.unwrap().retries + 1; - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .task - .unwrap() - .id - .as_hyphenated() - .encode_lower(&mut buffer); + let uuid = params.uuid.unwrap(); let error = params.error_message.unwrap(); @@ -249,7 +236,7 @@ where .bind(retries) .bind(scheduled_at) .bind(now) - .bind(&*uuid_as_text) + .bind(uuid) .fetch_one(pool) .await?; @@ -262,8 +249,6 @@ where params: QueryParams<'_>, ) -> Result { let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); let metadata = params.metadata.unwrap(); @@ -275,7 +260,7 @@ where let uniq_hash = calculate_hash(&metadata_str); let task: Task = sqlx::query_as(query) - .bind(uuid_as_str) + .bind(&uuid) .bind(metadata_str) .bind(task_type) .bind(uniq_hash) @@ -291,8 +276,6 @@ where params: QueryParams<'_>, ) -> Result { let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); let scheduled_at_i64 = params.scheduled_at.unwrap().timestamp(); @@ -300,7 +283,7 @@ where let task_type = params.task_type.unwrap(); let task: Task = sqlx::query_as(query) - .bind(uuid_as_str) + .bind(&uuid) .bind(metadata_str) .bind(task_type) .bind(scheduled_at_i64) @@ -321,13 +304,10 @@ where let uuid = params.uuid.unwrap(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); - let task: Task = sqlx::query_as(query) .bind(state_str) .bind(updated_at) - .bind(&*uuid_as_text) + .bind(uuid) .fetch_one(pool) .await?; @@ -341,10 +321,7 @@ where ) -> Result { let updated_at = Utc::now().timestamp(); - let id = params.task.unwrap().id; - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); + let uuid = params.task.unwrap().id; let error_message = params.error_message.unwrap(); @@ -352,7 +329,7 @@ where .bind(<&str>::from(FangTaskState::Failed)) .bind(error_message) .bind(updated_at) - .bind(&*uuid_as_text) + .bind(&uuid) .fetch_one(pool) .await?; @@ -391,15 +368,10 @@ where pool: &Pool, params: QueryParams<'_>, ) -> Result { - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .uuid - .unwrap() - .as_hyphenated() - .encode_lower(&mut buffer); + let uuid = params.uuid.unwrap(); let result = sqlx::query(query) - .bind(&*uuid_as_text) + .bind(uuid) .execute(pool) .await? .into() diff --git a/fang/src/asynk/backend_sqlx/postgres.rs b/fang/src/asynk/backend_sqlx/postgres.rs index 5afe304a..615ebbe4 100644 --- a/fang/src/asynk/backend_sqlx/postgres.rs +++ b/fang/src/asynk/backend_sqlx/postgres.rs @@ -42,9 +42,9 @@ use crate::Task; impl<'a> FromRow<'a, PgRow> for Task { fn from_row(row: &'a PgRow) -> Result { - let uuid_as_text: &str = row.get("id"); + let id: Uuid = row.get("id"); - let id = Uuid::parse_str(uuid_as_text).unwrap(); + //let id = Uuid::parse_str(uuid_as_text).unwrap(); let raw: &str = row.get("metadata"); // will work if database cast json to string let raw = raw.replace('\\', ""); @@ -66,26 +66,11 @@ impl<'a> FromRow<'a, PgRow> for Task { let retries: i32 = row.get("retries"); - let scheduled_at_str: &str = row.get("scheduled_at"); + let scheduled_at: DateTime = row.get("scheduled_at"); - // This unwrap is safe because we know that the database returns the date in the correct format - let scheduled_at: DateTime = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z") - .unwrap() - .into(); + let created_at: DateTime = row.get("created_at"); - let created_at_str: &str = row.get("created_at"); - - // This unwrap is safe because we know that the database returns the date in the correct format - let created_at: DateTime = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z") - .unwrap() - .into(); - - let updated_at_str: &str = row.get("updated_at"); - - // This unwrap is safe because we know that the database returns the date in the correct format - let updated_at: DateTime = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z") - .unwrap() - .into(); + let updated_at: DateTime = row.get("updated_at"); Ok(Task::builder() .id(id) diff --git a/fang/src/asynk/backend_sqlx/sqlite.rs b/fang/src/asynk/backend_sqlx/sqlite.rs index a3ede36b..c039633d 100644 --- a/fang/src/asynk/backend_sqlx/sqlite.rs +++ b/fang/src/asynk/backend_sqlx/sqlite.rs @@ -36,9 +36,9 @@ use SqlXQuery as Q; impl<'a> FromRow<'a, SqliteRow> for Task { fn from_row(row: &'a SqliteRow) -> Result { - let uuid_as_text: &str = row.get("id"); + let id: Uuid = row.get("id"); - let id = Uuid::parse_str(uuid_as_text).unwrap(); + //let id = Uuid::parse_str(uuid_as_text).unwrap(); let raw: &str = row.get("metadata"); // will work if database cast json to string let raw = raw.replace('\\', ""); @@ -60,20 +60,11 @@ impl<'a> FromRow<'a, SqliteRow> for Task { let retries: i32 = row.get("retries"); - let scheduled_at: i64 = row.get("scheduled_at"); + let scheduled_at: DateTime = row.get("scheduled_at"); - // This unwrap is safe because we know that the database returns the date in the correct format - let scheduled_at: DateTime = DateTime::from_timestamp(scheduled_at, 0).unwrap(); + let created_at: DateTime = row.get("created_at"); - let created_at: i64 = row.get("created_at"); - - // This unwrap is safe because we know that the database returns the date in the correct format - let created_at: DateTime = DateTime::from_timestamp(created_at, 0).unwrap(); - - let updated_at: i64 = row.get("updated_at"); - - // This unwrap is safe because we know that the database returns the date in the correct format - let updated_at: DateTime = DateTime::from_timestamp(updated_at, 0).unwrap(); + let updated_at: DateTime = row.get("updated_at"); Ok(Task::builder() .id(id) diff --git a/fang/src/asynk/queries_postgres/fail_task.sql b/fang/src/asynk/queries_postgres/fail_task.sql index bb057d75..2cb1f337 100644 --- a/fang/src/asynk/queries_postgres/fail_task.sql +++ b/fang/src/asynk/queries_postgres/fail_task.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = to_timestamp($3) WHERE id = $4::uuid RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text \ No newline at end of file +UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = to_timestamp($3) WHERE id = $4 RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at \ No newline at end of file diff --git a/fang/src/asynk/queries_postgres/fetch_task_type.sql b/fang/src/asynk/queries_postgres/fetch_task_type.sql index 54ae20d3..3d39ca48 100644 --- a/fang/src/asynk/queries_postgres/fetch_task_type.sql +++ b/fang/src/asynk/queries_postgres/fetch_task_type.sql @@ -1 +1 @@ -SELECT id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND to_timestamp($2) >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED +SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND to_timestamp($2) >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED diff --git a/fang/src/asynk/queries_postgres/find_task_by_id.sql b/fang/src/asynk/queries_postgres/find_task_by_id.sql index d6e9ee80..7b0c419f 100644 --- a/fang/src/asynk/queries_postgres/find_task_by_id.sql +++ b/fang/src/asynk/queries_postgres/find_task_by_id.sql @@ -1 +1 @@ -SELECT id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text FROM fang_tasks WHERE id = $1::uuid +SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1::uuid diff --git a/fang/src/asynk/queries_postgres/find_task_by_uniq_hash.sql b/fang/src/asynk/queries_postgres/find_task_by_uniq_hash.sql index df3d3aa7..3c7bb332 100644 --- a/fang/src/asynk/queries_postgres/find_task_by_uniq_hash.sql +++ b/fang/src/asynk/queries_postgres/find_task_by_uniq_hash.sql @@ -1 +1 @@ -SELECT id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1 +SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1 diff --git a/fang/src/asynk/queries_postgres/insert_task.sql b/fang/src/asynk/queries_postgres/insert_task.sql index 530ad8bd..e5becd6d 100644 --- a/fang/src/asynk/queries_postgres/insert_task.sql +++ b/fang/src/asynk/queries_postgres/insert_task.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1::uuid, $2::jsonb, $3, to_timestamp($4) ) RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text +INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1, $2::jsonb, $3, to_timestamp($4) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at diff --git a/fang/src/asynk/queries_postgres/insert_task_uniq.sql b/fang/src/asynk/queries_postgres/insert_task_uniq.sql index 21e6ed8c..9a8d3958 100644 --- a/fang/src/asynk/queries_postgres/insert_task_uniq.sql +++ b/fang/src/asynk/queries_postgres/insert_task_uniq.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1::uuid, $2::jsonb , $3, $4, to_timestamp($5) ) RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text +INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2::jsonb , $3, $4, to_timestamp($5) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at diff --git a/fang/src/asynk/queries_postgres/retry_task.sql b/fang/src/asynk/queries_postgres/retry_task.sql index e42eaa38..5ad6c9bb 100644 --- a/fang/src/asynk/queries_postgres/retry_task.sql +++ b/fang/src/asynk/queries_postgres/retry_task.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = to_timestamp($3), "updated_at" = to_timestamp($4) WHERE id = $5::uuid RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text +UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = to_timestamp($3), "updated_at" = to_timestamp($4) WHERE id = $5::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at diff --git a/fang/src/asynk/queries_postgres/update_task_state.sql b/fang/src/asynk/queries_postgres/update_task_state.sql index 8801eee0..e41a533e 100644 --- a/fang/src/asynk/queries_postgres/update_task_state.sql +++ b/fang/src/asynk/queries_postgres/update_task_state.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = to_timestamp($2) WHERE id = $3::uuid RETURNING id::text , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at::text , created_at::text , updated_at::text +UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = to_timestamp($2) WHERE id = $3::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at diff --git a/fang/src/blocking/sqlite_schema.rs b/fang/src/blocking/sqlite_schema.rs index fe2e2a04..602d1756 100644 --- a/fang/src/blocking/sqlite_schema.rs +++ b/fang/src/blocking/sqlite_schema.rs @@ -2,7 +2,7 @@ diesel::table! { fang_tasks (id) { - id -> Text, + id -> Binary, metadata -> Text, error_message -> Nullable, state -> Text,