diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..2e84004 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "rust-analyzer.linkedProjects": [ + "./fang/Cargo.toml", + ] +} \ No newline at end of file diff --git a/fang/src/asynk/async_queue.rs b/fang/src/asynk/async_queue.rs index c905ce8..5374fd3 100644 --- a/fang/src/asynk/async_queue.rs +++ b/fang/src/asynk/async_queue.rs @@ -13,17 +13,15 @@ use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; use cron::Schedule; -//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7 +use sqlx::any::AnyConnectOptions; +use sqlx::any::AnyKind; +#[cfg(any( + feature = "asynk-postgres", + feature = "asynk-mysql", + feature = "asynk-sqlite" +))] use sqlx::pool::PoolOptions; -use sqlx::Database; - -use sqlx::MySql; -use sqlx::MySqlPool; -use sqlx::Pool; -use sqlx::Postgres; -use sqlx::Sqlite; -use sqlx::SqlitePool; -use std::any::Any; +//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7 use std::str::FromStr; use thiserror::Error; use typed_builder::TypedBuilder; @@ -31,10 +29,16 @@ use uuid::Uuid; #[cfg(feature = "asynk-postgres")] use sqlx::PgPool; +#[cfg(feature = "asynk-postgres")] +use sqlx::Postgres; +#[cfg(feature = "asynk-mysql")] +use sqlx::MySql; #[cfg(feature = "asynk-mysql")] use sqlx::MySqlPool; +#[cfg(feature = "asynk-sqlite")] +use sqlx::Sqlite; #[cfg(feature = "asynk-sqlite")] use sqlx::SqlitePool; @@ -148,14 +152,48 @@ pub trait AsyncQueueable: Send { /// .build(); /// ``` /// +/// + +#[derive(Debug, Clone)] +pub(crate) enum InternalPool { + #[cfg(feature = "asynk-postgres")] + Pg(PgPool), + #[cfg(feature = "asynk-mysql")] + MySql(MySqlPool), + #[cfg(feature = "asynk-sqlite")] + Sqlite(SqlitePool), +} + +impl InternalPool { + #[cfg(feature = "asynk-postgres")] + pub(crate) fn unwrap_pg_pool(&self) -> &PgPool { + match self { + InternalPool::Pg(pool) => pool, + _ => panic!("Not a PgPool!"), + } + } + + #[cfg(feature = "asynk-mysql")] + pub(crate) fn unwrap_mysql_pool(&self) -> &MySqlPool { + match self { + InternalPool::MySql(pool) => pool, + _ => panic!("Not a MySqlPool!"), + } + } + + #[cfg(feature = "asynk-sqlite")] + pub(crate) fn unwrap_sqlite_pool(&self) -> &SqlitePool { + match self { + InternalPool::Sqlite(pool) => pool, + _ => panic!("Not a SqlitePool!"), + } + } +} #[derive(TypedBuilder, Debug, Clone)] -pub struct AsyncQueue -where - DB: Database, -{ +pub struct AsyncQueue { #[builder(default=None, setter(skip))] - pool: Option>, + pool: Option, #[builder(setter(into))] uri: String, #[builder(setter(into))] @@ -169,19 +207,19 @@ where #[cfg(test)] use tokio::sync::Mutex; -#[cfg(test)] +#[cfg(all(test, feature = "asynk-postgres"))] static ASYNC_QUEUE_POSTGRES_TEST_COUNTER: Mutex = Mutex::const_new(0); -#[cfg(test)] +#[cfg(all(test, feature = "asynk-sqlite"))] static ASYNC_QUEUE_SQLITE_TEST_COUNTER: Mutex = Mutex::const_new(0); -#[cfg(test)] +#[cfg(all(test, feature = "asynk-mysql"))] static ASYNC_QUEUE_MYSQL_TEST_COUNTER: Mutex = Mutex::const_new(0); #[cfg(test)] use sqlx::Executor; -#[cfg(test)] +#[cfg(all(test, feature = "asynk-sqlite"))] use std::path::Path; #[cfg(test)] @@ -189,29 +227,44 @@ use std::env; use super::backend_sqlx::BackendSqlX; -fn get_backend<'a, DB: Database>(pool: &'a Pool) -> BackendSqlX { - let type_pool = pool.type_id(); - #[cfg(feature = "asynk-postgres")] - if std::any::TypeId::of::() == type_pool { - return BackendSqlX::Pg; - } - #[cfg(feature = "asynk-mysql")] - if std::any::TypeId::of::() == type_pool { - return BackendSqlX::MySql; - } - - #[cfg(feature = "asynk-sqlite")] - if std::any::TypeId::of::() == type_pool { - return BackendSqlX::Sqlite; +async fn get_backend( + kind: AnyKind, + _uri: &str, + _max_connections: u32, +) -> Result<(BackendSqlX, InternalPool), AsyncQueueError> { + match kind { + #[cfg(feature = "asynk-postgres")] + AnyKind::Postgres => { + let pool = PoolOptions::::new() + .max_connections(_max_connections) + .connect(_uri) + .await?; + + Ok((BackendSqlX::Pg, InternalPool::Pg(pool))) + } + #[cfg(feature = "asynk-mysql")] + AnyKind::MySql => { + let pool = PoolOptions::::new() + .max_connections(_max_connections) + .connect(_uri) + .await?; + + Ok((BackendSqlX::MySql, InternalPool::MySql(pool))) + } + #[cfg(feature = "asynk-sqlite")] + AnyKind::Sqlite => { + let pool = PoolOptions::::new() + .max_connections(_max_connections) + .connect(_uri) + .await?; + + Ok((BackendSqlX::Sqlite, InternalPool::Sqlite(pool))) + } + _ => panic!("Not a valid backend"), } - - unreachable!() } -impl AsyncQueue -where - DB: Database, -{ +impl AsyncQueue { /// Check if the connection with db is established pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> { if self.connected { @@ -225,20 +278,18 @@ where pub async fn connect(&mut self) -> Result<(), AsyncQueueError> { //install_default_drivers(); - let pool: Pool = PoolOptions::new() - .max_connections(self.max_pool_size) - .connect(&self.uri) - .await?; + let kind: AnyKind = self.uri.parse::()?.kind(); - self.backend = get_backend(&pool); + let (backend, pool) = get_backend(kind, &self.uri, self.max_pool_size).await?; self.pool = Some(pool); + self.backend = backend; self.connected = true; Ok(()) } async fn fetch_and_touch_task_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, task_type: Option, ) -> Result, AsyncQueueError> { @@ -274,7 +325,7 @@ where } async fn insert_task_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, metadata: &serde_json::Value, task_type: &str, @@ -287,7 +338,7 @@ where .build(); let task = backend - .execute_query(SqlXQuery::InsertTask, pool, query_params) + .execute_query(SqlXQuery::InsertTask, &pool, query_params) .await? .unwrap_task(); @@ -295,7 +346,7 @@ where } async fn insert_task_if_not_exist_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, metadata: &serde_json::Value, task_type: &str, @@ -308,7 +359,7 @@ where .build(); let task = backend - .execute_query(SqlXQuery::InsertTaskIfNotExists, pool, query_params) + .execute_query(SqlXQuery::InsertTaskIfNotExists, &pool, query_params) .await? .unwrap_task(); @@ -316,7 +367,7 @@ where } async fn schedule_task_query( - pool: &Pool, + pool: &InternalPool, backend: &BackendSqlX, task: &dyn AsyncRunnable, ) -> Result { @@ -358,10 +409,7 @@ where } #[async_trait] -impl AsyncQueueable for AsyncQueue -where - DB: Database, -{ +impl AsyncQueueable for AsyncQueue { async fn find_task_by_id(&mut self, id: &Uuid) -> Result { self.check_if_connection()?; let pool = self.pool.as_ref().unwrap(); @@ -580,8 +628,8 @@ where } } -#[cfg(test)] -impl AsyncQueue { +#[cfg(all(test, feature = "asynk-postgres"))] +impl AsyncQueue { /// Provides an AsyncQueue connected to its own DB pub async fn test_postgres() -> Self { dotenvy::dotenv().expect(".env file not found"); @@ -602,7 +650,14 @@ impl AsyncQueue { let create_query: &str = &format!("CREATE DATABASE {} WITH TEMPLATE fang;", db_name); let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name); - let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap(); + let mut conn = res + .pool + .as_mut() + .unwrap() + .unwrap_pg_pool() + .acquire() + .await + .unwrap(); log::info!("Deleting database {db_name} ..."); conn.execute(delete_query).await.unwrap(); @@ -629,8 +684,8 @@ impl AsyncQueue { } } -#[cfg(test)] -impl AsyncQueue { +#[cfg(all(test, feature = "asynk-sqlite"))] +impl AsyncQueue { /// Provides an AsyncQueue connected to its own DB pub async fn test_sqlite() -> Self { dotenvy::dotenv().expect(".env file not found"); @@ -664,8 +719,8 @@ impl AsyncQueue { } } -#[cfg(test)] -impl AsyncQueue { +#[cfg(all(test, feature = "asynk-mysql"))] +impl AsyncQueue { /// Provides an AsyncQueue connected to its own DB pub async fn test_mysql() -> Self { dotenvy::dotenv().expect(".env file not found"); @@ -690,7 +745,14 @@ impl AsyncQueue { let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name); - let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap(); + let mut conn = res + .pool + .as_mut() + .unwrap() + .unwrap_mysql_pool() + .acquire() + .await + .unwrap(); log::info!("Deleting database {db_name} ..."); conn.execute(delete_query).await.unwrap(); @@ -717,11 +779,11 @@ impl AsyncQueue { } } -#[cfg(test)] -test_asynk_queue! {postgres, crate::AsyncQueue, crate::AsyncQueue::test_postgres()} +#[cfg(all(test, feature = "asynk-postgres"))] +test_asynk_queue! {postgres, crate::AsyncQueue,crate::AsyncQueue::test_postgres()} -#[cfg(test)] -test_asynk_queue! {sqlite, crate::AsyncQueue, crate::AsyncQueue::test_sqlite()} +#[cfg(all(test, feature = "asynk-sqlite"))] +test_asynk_queue! {sqlite, crate::AsyncQueue,crate::AsyncQueue::test_sqlite()} -#[cfg(test)] -test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()} +#[cfg(all(test, feature = "asynk-mysql"))] +test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()} diff --git a/fang/src/asynk/async_queue/async_queue_tests.rs b/fang/src/asynk/async_queue/async_queue_tests.rs index fa9e42b..62e836b 100644 --- a/fang/src/asynk/async_queue/async_queue_tests.rs +++ b/fang/src/asynk/async_queue/async_queue_tests.rs @@ -113,7 +113,7 @@ macro_rules! test_asynk_queue { } #[tokio::test] - async fn failed_task_query_test() { + async fn failed_task_test() { let mut test: $q = $e.await; let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap(); diff --git a/fang/src/asynk/async_worker.rs b/fang/src/asynk/async_worker.rs index 8f6b727..7c73227 100644 --- a/fang/src/asynk/async_worker.rs +++ b/fang/src/asynk/async_worker.rs @@ -263,7 +263,6 @@ mod async_worker_tests { use chrono::Duration; use chrono::Utc; use serde::{Deserialize, Serialize}; - use sqlx::Database; #[derive(Serialize, Deserialize)] struct WorkerAsyncTask { @@ -564,10 +563,7 @@ mod async_worker_tests { assert_eq!(id2, task2.id); } - async fn insert_task( - test: &mut AsyncQueue, - task: &dyn AsyncRunnable, - ) -> Task { + async fn insert_task(test: &mut AsyncQueue, task: &dyn AsyncRunnable) -> Task { test.insert_task(task).await.unwrap() } diff --git a/fang/src/asynk/backend_sqlx.rs b/fang/src/asynk/backend_sqlx.rs index 6986960..10e4dae 100644 --- a/fang/src/asynk/backend_sqlx.rs +++ b/fang/src/asynk/backend_sqlx.rs @@ -1,9 +1,15 @@ use chrono::{DateTime, Duration, Utc}; use sha2::Digest; use sha2::Sha256; -use sqlx::Any; +use sqlx::any::AnyQueryResult; +use sqlx::database::HasArguments; use sqlx::Database; +use sqlx::Encode; +use sqlx::Executor; +use sqlx::FromRow; +use sqlx::IntoArguments; use sqlx::Pool; +use sqlx::Type; use std::fmt::Debug; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -36,6 +42,7 @@ pub(crate) enum BackendSqlX { NoBackend, } +#[allow(dead_code)] #[derive(TypedBuilder, Clone)] pub(crate) struct QueryParams<'a> { #[builder(default, setter(strip_option))] @@ -81,19 +88,25 @@ impl Res { } impl BackendSqlX { - pub(crate) async fn execute_query<'a, DB: Database>( + pub(crate) async fn execute_query( &self, _query: SqlXQuery, - _pool: &Pool, + _pool: &InternalPool, _params: QueryParams<'_>, ) -> Result { match self { #[cfg(feature = "asynk-postgres")] - BackendSqlX::Pg => BackendSqlXPg::execute_query(_query, _pool, _params).await, + BackendSqlX::Pg => { + BackendSqlXPg::execute_query(_query, _pool.unwrap_pg_pool(), _params).await + } #[cfg(feature = "asynk-sqlite")] - BackendSqlX::Sqlite => BackendSqlXSQLite::execute_query(_query, _pool, _params).await, + BackendSqlX::Sqlite => { + BackendSqlXSQLite::execute_query(_query, _pool.unwrap_sqlite_pool(), _params).await + } #[cfg(feature = "asynk-mysql")] - BackendSqlX::MySql => BackendSqlXMySQL::execute_query(_query, _pool, _params).await, + BackendSqlX::MySql => { + BackendSqlXMySQL::execute_query(_query, _pool.unwrap_mysql_pool(), _params).await + } _ => unreachable!(), } } @@ -134,46 +147,9 @@ pub(crate) enum SqlXQuery { use crate::AsyncQueueError; use crate::AsyncRunnable; use crate::FangTaskState; +use crate::InternalPool; use crate::Task; -#[allow(dead_code)] -async fn general_any_impl_insert_task_if_not_exists( - queries: (&str, &str), - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - match general_any_impl_find_task_by_uniq_hash(queries.0, pool, ¶ms).await { - Some(task) => Ok(task), - None => general_any_impl_insert_task_uniq(queries.1, pool, params).await, - } -} - -#[allow(dead_code)] -async fn general_any_impl_insert_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let metadata_str = params.metadata.unwrap().to_string(); - let task_type = params.task_type.unwrap(); - - let task: Task = sqlx::query_as(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(scheduled_at_str) - .fetch_one(pool) - .await?; - - Ok(task) -} - #[allow(dead_code)] pub(crate) fn calculate_hash(json: &str) -> String { let mut hasher = Sha256::new(); @@ -182,258 +158,304 @@ pub(crate) fn calculate_hash(json: &str) -> String { hex::encode(result) } -#[allow(dead_code)] -async fn general_any_impl_insert_task_uniq( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let metadata = params.metadata.unwrap(); - - let metadata_str = metadata.to_string(); - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let task_type = params.task_type.unwrap(); - - let uniq_hash = calculate_hash(&metadata_str); - - let task: Task = sqlx::query_as(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(uniq_hash) - .bind(scheduled_at_str) - .fetch_one(pool) - .await?; - Ok(task) -} +trait FangQueryable +where + DB: Database, + for<'r> Task: FromRow<'r, ::Row>, + for<'r> std::string::String: Encode<'r, DB> + Type, + for<'r> &'r str: Encode<'r, DB> + Type, + for<'r> i32: Encode<'r, DB> + Type, + for<'r> &'r Pool: Executor<'r, Database = DB>, + for<'r> >::Arguments: IntoArguments<'r, DB>, + ::QueryResult: Into, +{ + async fn fetch_task_type( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let task_type = params.task_type.unwrap(); + + let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); + + let task: Task = sqlx::query_as(query) + .bind(task_type) + .bind(now_str) + .fetch_one(pool) + .await?; + + Ok(task) + } -#[allow(dead_code)] -async fn general_any_impl_update_task_state( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); + async fn find_task_by_uniq_hash( + query: &str, + pool: &Pool, + params: &QueryParams<'_>, + ) -> Option { + let metadata = params.metadata.unwrap(); - let state_str: &str = params.state.unwrap().into(); + let uniq_hash = calculate_hash(&metadata.to_string()); - let uuid = params.uuid.unwrap(); + sqlx::query_as(query) + .bind(uniq_hash) + .fetch_one(pool) + .await + .ok() + } - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); + async fn find_task_by_id( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .uuid + .unwrap() + .as_hyphenated() + .encode_lower(&mut buffer); + + let task: Task = sqlx::query_as(query) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(task) + } - let task: Task = sqlx::query_as(query) - .bind(state_str) - .bind(updated_at_str) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; + async fn retry_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let now = Utc::now(); + let now_str = format!("{}", now.format("%F %T%.f+00")); + + let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); + let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); + let retries = params.task.unwrap().retries + 1; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .task + .unwrap() + .id + .as_hyphenated() + .encode_lower(&mut buffer); + + let error = params.error_message.unwrap(); + + let failed_task: Task = sqlx::query_as(query) + .bind(error) + .bind(retries) + .bind(scheduled_at_str) + .bind(now_str) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(failed_task) + } - Ok(task) -} + async fn insert_task_uniq( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let metadata = params.metadata.unwrap(); + + let metadata_str = metadata.to_string(); + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let task_type = params.task_type.unwrap(); + + let uniq_hash = calculate_hash(&metadata_str); + + let task: Task = sqlx::query_as(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(uniq_hash) + .bind(scheduled_at_str) + .fetch_one(pool) + .await?; + Ok(task) + } -#[allow(dead_code)] -async fn general_any_impl_fail_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); + async fn insert_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let metadata_str = params.metadata.unwrap().to_string(); + let task_type = params.task_type.unwrap(); + + let task: Task = sqlx::query_as(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(scheduled_at_str) + .fetch_one(pool) + .await?; + + Ok(task) + } - let id = params.task.unwrap().id; + async fn update_task_state( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); + let state_str: &str = params.state.unwrap().into(); - let error_message = params.error_message.unwrap(); + let uuid = params.uuid.unwrap(); - let failed_task: Task = sqlx::query_as(query) - .bind(<&str>::from(FangTaskState::Failed)) - .bind(error_message) - .bind(updated_at) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); - Ok(failed_task) -} + let task: Task = sqlx::query_as(query) + .bind(state_str) + .bind(updated_at_str) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; -#[allow(dead_code)] -async fn general_any_impl_remove_all_task( - query: &str, - pool: &Pool, -) -> Result { - Ok(sqlx::query(query).execute(pool).await?.rows_affected()) -} + Ok(task) + } -#[allow(dead_code)] -async fn general_any_impl_remove_all_scheduled_tasks( - query: &str, - pool: &Pool, -) -> Result { - let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); - - Ok(sqlx::query(query) - .bind(now_str) - .execute(pool) - .await? - .rows_affected()) -} + async fn fail_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); -#[allow(dead_code)] -async fn general_any_impl_remove_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .uuid - .unwrap() - .as_hyphenated() - .encode_lower(&mut buffer); - - let result = sqlx::query(query) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if result != 1 { - Err(AsyncQueueError::ResultError { - expected: 1, - found: result, - }) - } else { - Ok(result) + let id = params.task.unwrap().id; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); + + let error_message = params.error_message.unwrap(); + + let failed_task: Task = sqlx::query_as(query) + .bind(<&str>::from(FangTaskState::Failed)) + .bind(error_message) + .bind(updated_at) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(failed_task) } -} -#[allow(dead_code)] -async fn general_any_impl_remove_task_by_metadata( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let metadata = serde_json::to_value(params.runnable.unwrap())?; - - let uniq_hash = calculate_hash(&metadata.to_string()); - - Ok(sqlx::query(query) - .bind(uniq_hash) - .execute(pool) - .await? - .rows_affected()) -} + async fn remove_all_task(query: &str, pool: &Pool) -> Result { + // This converts QueryResult to AnyQueryResult and then to u64 + // do not delete into() method and do not delete Into trait bound + Ok(sqlx::query(query) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_remove_task_type( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let task_type = params.task_type.unwrap(); - - Ok(sqlx::query(query) - .bind(task_type) - .execute(pool) - .await? - .rows_affected()) -} + async fn remove_all_scheduled_tasks( + query: &str, + pool: &Pool, + ) -> Result { + let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); + + // This converts QueryResult to AnyQueryResult and then to u64 + // do not delete into() method and do not delete Into trait bound + + Ok(sqlx::query(query) + .bind(now_str) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_fetch_task_type( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let task_type = params.task_type.unwrap(); - - let now_str = format!("{}", Utc::now().format("%F %T%.f+00")); - - let task: Task = sqlx::query_as(query) - .bind(task_type) - .bind(now_str) - .fetch_one(pool) - .await?; - - Ok(task) -} + async fn remove_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .uuid + .unwrap() + .as_hyphenated() + .encode_lower(&mut buffer); + + let result = sqlx::query(query) + .bind(&*uuid_as_text) + .execute(pool) + .await? + .into() + .rows_affected(); + + if result != 1 { + Err(AsyncQueueError::ResultError { + expected: 1, + found: result, + }) + } else { + Ok(result) + } + } -#[allow(dead_code)] -async fn general_any_impl_find_task_by_uniq_hash( - query: &str, - pool: &Pool, - params: &QueryParams<'_>, -) -> Option { - let metadata = params.metadata.unwrap(); - - let uniq_hash = calculate_hash(&metadata.to_string()); - - sqlx::query_as(query) - .bind(uniq_hash) - .fetch_one(pool) - .await - .ok() -} + async fn remove_task_by_metadata( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let metadata = serde_json::to_value(params.runnable.unwrap())?; + + let uniq_hash = calculate_hash(&metadata.to_string()); + + Ok(sqlx::query(query) + .bind(uniq_hash) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_find_task_by_id( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .uuid - .unwrap() - .as_hyphenated() - .encode_lower(&mut buffer); - - let task: Task = sqlx::query_as(query) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; - - Ok(task) -} + async fn remove_task_type( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let task_type = params.task_type.unwrap(); + + Ok(sqlx::query(query) + .bind(task_type) + .execute(pool) + .await? + .into() + .rows_affected()) + } -#[allow(dead_code)] -async fn general_any_impl_retry_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let now = Utc::now(); - let now_str = format!("{}", now.format("%F %T%.f+00")); - - let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); - let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); - let retries = params.task.unwrap().retries + 1; - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = params - .task - .unwrap() - .id - .as_hyphenated() - .encode_lower(&mut buffer); - - let error = params.error_message.unwrap(); - - let failed_task: Task = sqlx::query_as(query) - .bind(error) - .bind(retries) - .bind(scheduled_at_str) - .bind(now_str) - .bind(&*uuid_as_text) - .fetch_one(pool) - .await?; - - Ok(failed_task) + async fn insert_task_if_not_exists( + queries: (&str, &str), + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + match Self::find_task_by_uniq_hash(queries.0, pool, ¶ms).await { + Some(task) => Ok(task), + None => Self::insert_task_uniq(queries.1, pool, params).await, + } + } } diff --git a/fang/src/asynk/backend_sqlx/mysql.rs b/fang/src/asynk/backend_sqlx/mysql.rs index 6c49815..dba82d3 100644 --- a/fang/src/asynk/backend_sqlx/mysql.rs +++ b/fang/src/asynk/backend_sqlx/mysql.rs @@ -15,61 +15,370 @@ const FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL: &str = const FIND_TASK_BY_ID_QUERY_MYSQL: &str = include_str!("../queries_mysql/find_task_by_id.sql"); const RETRY_TASK_QUERY_MYSQL: &str = include_str!("../queries_mysql/retry_task.sql"); -use super::general_any_impl_fetch_task_type; -use super::general_any_impl_find_task_by_id; -use super::general_any_impl_find_task_by_uniq_hash; -use super::general_any_impl_remove_all_scheduled_tasks; -use super::general_any_impl_remove_all_task; -use super::general_any_impl_remove_task; -use super::general_any_impl_remove_task_by_metadata; -use super::general_any_impl_remove_task_type; -use super::{calculate_hash, QueryParams, Res, SqlXQuery}; -use crate::{AsyncQueueError, FangTaskState, Task}; -use SqlXQuery as Q; - use chrono::Duration; -use chrono::Utc; -use sqlx::{Any, Pool}; +use chrono::{DateTime, Utc}; +use sqlx::mysql::MySqlQueryResult; +use sqlx::mysql::MySqlRow; +use sqlx::FromRow; +use sqlx::MySql; +use sqlx::Pool; +use sqlx::Row; use uuid::Uuid; +use SqlXQuery as Q; + +use super::FangQueryable; +use super::{calculate_hash, QueryParams, Res, SqlXQuery}; +use crate::{AsyncQueueError, FangTaskState, Task}; #[derive(Debug, Clone)] -pub(crate) struct BackendSqlXMySQL {} +pub(super) struct BackendSqlXMySQL {} + +impl<'a> FromRow<'a, MySqlRow> for Task { + fn from_row(row: &'a MySqlRow) -> Result { + let uuid_as_text: &str = row.get("id"); + + let id = Uuid::parse_str(uuid_as_text).unwrap(); + + let raw: &str = row.get("metadata"); // will work if database cast json to string + let raw = raw.replace('\\', ""); + + // -- SELECT metadata->>'type' FROM fang_tasks ; this works because jsonb casting + let metadata: serde_json::Value = serde_json::from_str(&raw).unwrap(); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let error_message: Option = row.get("error_message"); + + let state_str: &str = row.get("state"); // will work if database cast json to string + + let state: FangTaskState = state_str.into(); + + let task_type: String = row.get("task_type"); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let uniq_hash: Option = row.get("uniq_hash"); + + let retries: i32 = row.get("retries"); + + let scheduled_at_str: &str = row.get("scheduled_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let scheduled_at: DateTime = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let created_at_str: &str = row.get("created_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let created_at: DateTime = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let updated_at_str: &str = row.get("updated_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let updated_at: DateTime = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + Ok(Task::builder() + .id(id) + .metadata(metadata) + .error_message(error_message) + .state(state) + .task_type(task_type) + .uniq_hash(uniq_hash) + .retries(retries) + .scheduled_at(scheduled_at) + .created_at(created_at) + .updated_at(updated_at) + .build()) + } +} + +impl FangQueryable for BackendSqlXMySQL { + async fn insert_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let metadata_str = params.metadata.unwrap().to_string(); + let task_type = params.task_type.unwrap(); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(scheduled_at_str) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&uuid).build(); + + let task: Task = + Self::find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; + + Ok(task) + } + + async fn update_task_state( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); + + let state_str: &str = params.state.unwrap().into(); + + let uuid = params.uuid.unwrap(); + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(state_str) + .bind(updated_at_str) + .bind(&*uuid_as_text) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(params.uuid.unwrap()).build(); + + let task: Task = + Self::find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; + + Ok(task) + } + + async fn insert_task_uniq( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let uuid = Uuid::new_v4(); + let mut buffer = Uuid::encode_buffer(); + let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); + + let metadata = params.metadata.unwrap(); + + let metadata_str = metadata.to_string(); + let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); + + let task_type = params.task_type.unwrap(); + + let uniq_hash = calculate_hash(&metadata_str); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(uuid_as_str) + .bind(metadata_str) + .bind(task_type) + .bind(uniq_hash) + .bind(scheduled_at_str) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&uuid).build(); + + let task: Task = + Self::find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; + + Ok(task) + } + + async fn fail_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); + + let id = params.task.unwrap().id; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); + + let error_message = params.error_message.unwrap(); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(<&str>::from(FangTaskState::Failed)) + .bind(error_message) + .bind(updated_at) + .bind(&*uuid_as_text) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&id).build(); + + let failed_task: Task = + Self::find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; + + Ok(failed_task) + } + + async fn retry_task( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let now = Utc::now(); + let now_str = format!("{}", now.format("%F %T%.f+00")); + + let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); + let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); + let retries = params.task.unwrap().retries + 1; + + let uuid = params.task.unwrap().id; + + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); + + let error = params.error_message.unwrap(); + + let affected_rows = Into::::into( + sqlx::query(query) + .bind(error) + .bind(retries) + .bind(scheduled_at_str) + .bind(now_str) + .bind(&*uuid_as_text) + .execute(pool) + .await?, + ) + .rows_affected(); + + if affected_rows != 1 { + return Err(AsyncQueueError::ResultError { + expected: 1, + found: affected_rows, + }); + } + + let query_params = QueryParams::builder().uuid(&uuid).build(); + + let failed_task: Task = + Self::find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; + + Ok(failed_task) + } + + async fn insert_task_if_not_exists( + queries: (&str, &str), + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + match Self::find_task_by_uniq_hash(queries.0, pool, ¶ms).await { + Some(task) => Ok(task), + None => Self::insert_task_uniq(queries.1, pool, params).await, + } + } + + async fn find_task_by_id( + query: &str, + pool: &Pool, + params: QueryParams<'_>, + ) -> Result { + let mut buffer = Uuid::encode_buffer(); + let uuid_as_text = params + .uuid + .unwrap() + .as_hyphenated() + .encode_lower(&mut buffer); + + let task: Task = sqlx::query_as(query) + .bind(&*uuid_as_text) + .fetch_one(pool) + .await?; + + Ok(task) + } +} impl BackendSqlXMySQL { pub(super) async fn execute_query( query: SqlXQuery, - pool: &Pool, + pool: &Pool, params: QueryParams<'_>, ) -> Result { match query { Q::InsertTask => { - let task = mysql_impl_insert_task(INSERT_TASK_QUERY_MYSQL, pool, params).await?; + let task = >::insert_task( + INSERT_TASK_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::UpdateTaskState => { - let task = - mysql_impl_update_task_state(UPDATE_TASK_STATE_QUERY_MYSQL, pool, params) - .await?; + let task = >::update_task_state( + UPDATE_TASK_STATE_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FailTask => { - let task = mysql_impl_fail_task(FAIL_TASK_QUERY_MYSQL, pool, params).await?; + let task = >::fail_task( + FAIL_TASK_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RemoveAllTask => { - let affected_rows = - general_any_impl_remove_all_task(REMOVE_ALL_TASK_QUERY_MYSQL, pool).await?; - - Ok(Res::Bigint(affected_rows)) - } - - Q::RemoveAllScheduledTask => { - let affected_rows = general_any_impl_remove_all_scheduled_tasks( - REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL, + let affected_rows = >::remove_all_task( + REMOVE_ALL_TASK_QUERY_MYSQL, pool, ) .await?; @@ -77,15 +386,20 @@ impl BackendSqlXMySQL { Ok(Res::Bigint(affected_rows)) } - Q::RemoveTask => { + Q::RemoveAllScheduledTask => { let affected_rows = - general_any_impl_remove_task(REMOVE_TASK_QUERY_MYSQL, pool, params).await?; + >::remove_all_scheduled_tasks( + REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL, + pool, + ) + .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskByMetadata => { - let affected_rows = general_any_impl_remove_task_by_metadata( - REMOVE_TASK_BY_METADATA_QUERY_MYSQL, + + Q::RemoveTask => { + let affected_rows = >::remove_task( + REMOVE_TASK_QUERY_MYSQL, pool, params, ) @@ -93,33 +407,58 @@ impl BackendSqlXMySQL { Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskType => { + Q::RemoveTaskByMetadata => { let affected_rows = - general_any_impl_remove_task_type(REMOVE_TASKS_TYPE_QUERY_MYSQL, pool, params) - .await?; + >::remove_task_by_metadata( + REMOVE_TASK_BY_METADATA_QUERY_MYSQL, + pool, + params, + ) + .await?; + + Ok(Res::Bigint(affected_rows)) + } + Q::RemoveTaskType => { + let affected_rows = >::remove_task_type( + REMOVE_TASKS_TYPE_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Bigint(affected_rows)) } Q::FetchTaskType => { - let task = - general_any_impl_fetch_task_type(FETCH_TASK_TYPE_QUERY_MYSQL, pool, params) - .await?; + let task = >::fetch_task_type( + FETCH_TASK_TYPE_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FindTaskById => { - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, params) - .await?; + let task: Task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RetryTask => { - let task = mysql_impl_retry_task(RETRY_TASK_QUERY_MYSQL, pool, params).await?; + let task = >::retry_task( + RETRY_TASK_QUERY_MYSQL, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::InsertTaskIfNotExists => { - let task = mysql_any_impl_insert_task_if_not_exists( + let task = >::insert_task_if_not_exists( ( FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL, INSERT_TASK_UNIQ_QUERY_MYSQL, @@ -138,214 +477,3 @@ impl BackendSqlXMySQL { "MySQL" } } - -async fn mysql_impl_insert_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let metadata_str = params.metadata.unwrap().to_string(); - let task_type = params.task_type.unwrap(); - - let affected_rows = sqlx::query(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(scheduled_at_str) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&uuid).build(); - - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(task) -} - -async fn mysql_impl_insert_task_uniq( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let uuid = Uuid::new_v4(); - let mut buffer = Uuid::encode_buffer(); - let uuid_as_str: &str = uuid.as_hyphenated().encode_lower(&mut buffer); - - let metadata = params.metadata.unwrap(); - - let metadata_str = metadata.to_string(); - let scheduled_at_str = format!("{}", params.scheduled_at.unwrap().format("%F %T%.f+00")); - - let task_type = params.task_type.unwrap(); - - let uniq_hash = calculate_hash(&metadata_str); - - let affected_rows = sqlx::query(query) - .bind(uuid_as_str) - .bind(metadata_str) - .bind(task_type) - .bind(uniq_hash) - .bind(scheduled_at_str) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&uuid).build(); - - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(task) -} - -async fn mysql_impl_update_task_state( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00")); - - let state_str: &str = params.state.unwrap().into(); - - let uuid = params.uuid.unwrap(); - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); - - let affected_rows = sqlx::query(query) - .bind(state_str) - .bind(updated_at_str) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(params.uuid.unwrap()).build(); - - let task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(task) -} - -async fn mysql_impl_fail_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let updated_at = format!("{}", Utc::now().format("%F %T%.f+00")); - - let id = params.task.unwrap().id; - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = id.as_hyphenated().encode_lower(&mut buffer); - - let error_message = params.error_message.unwrap(); - - let affected_rows = sqlx::query(query) - .bind(<&str>::from(FangTaskState::Failed)) - .bind(error_message) - .bind(updated_at) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&id).build(); - - let failed_task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(failed_task) -} - -async fn mysql_impl_retry_task( - query: &str, - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - let now = Utc::now(); - let now_str = format!("{}", now.format("%F %T%.f+00")); - - let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); - let scheduled_at_str = format!("{}", scheduled_at.format("%F %T%.f+00")); - let retries = params.task.unwrap().retries + 1; - - let uuid = params.task.unwrap().id; - - let mut buffer = Uuid::encode_buffer(); - let uuid_as_text = uuid.as_hyphenated().encode_lower(&mut buffer); - - let error = params.error_message.unwrap(); - - let affected_rows = sqlx::query(query) - .bind(error) - .bind(retries) - .bind(scheduled_at_str) - .bind(now_str) - .bind(&*uuid_as_text) - .execute(pool) - .await? - .rows_affected(); - - if affected_rows != 1 { - return Err(AsyncQueueError::ResultError { - expected: 1, - found: affected_rows, - }); - } - - let query_params = QueryParams::builder().uuid(&uuid).build(); - - let failed_task: Task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_MYSQL, pool, query_params).await?; - - Ok(failed_task) -} - -async fn mysql_any_impl_insert_task_if_not_exists( - queries: (&str, &str), - pool: &Pool, - params: QueryParams<'_>, -) -> Result { - match general_any_impl_find_task_by_uniq_hash(queries.0, pool, ¶ms).await { - Some(task) => Ok(task), - None => mysql_impl_insert_task_uniq(queries.1, pool, params).await, - } -} diff --git a/fang/src/asynk/backend_sqlx/postgres.rs b/fang/src/asynk/backend_sqlx/postgres.rs index 0535f3f..5afe304 100644 --- a/fang/src/asynk/backend_sqlx/postgres.rs +++ b/fang/src/asynk/backend_sqlx/postgres.rs @@ -24,41 +24,105 @@ const RETRY_TASK_QUERY_POSTGRES: &str = include_str!("../queries_postgres/retry_ #[derive(Debug, Clone)] pub(super) struct BackendSqlXPg {} -use sqlx::Database; +use chrono::DateTime; +use chrono::Utc; +use sqlx::postgres::PgRow; +use sqlx::FromRow; +use sqlx::Pool; +use sqlx::Postgres; +use sqlx::Row; +use uuid::Uuid; use SqlXQuery as Q; +use super::FangQueryable; +use super::{QueryParams, Res, SqlXQuery}; use crate::AsyncQueueError; +use crate::FangTaskState; +use crate::Task; -use super::general_any_impl_fail_task; -use super::general_any_impl_fetch_task_type; -use super::general_any_impl_find_task_by_id; -use super::general_any_impl_insert_task; -use super::general_any_impl_insert_task_if_not_exists; -use super::general_any_impl_remove_all_scheduled_tasks; -use super::general_any_impl_remove_all_task; -use super::general_any_impl_remove_task; -use super::general_any_impl_remove_task_by_metadata; -use super::general_any_impl_remove_task_type; -use super::general_any_impl_retry_task; -use super::general_any_impl_update_task_state; -use super::{QueryParams, Res, SqlXQuery}; -use sqlx::Pool; +impl<'a> FromRow<'a, PgRow> for Task { + fn from_row(row: &'a PgRow) -> Result { + let uuid_as_text: &str = row.get("id"); + + let id = Uuid::parse_str(uuid_as_text).unwrap(); + + let raw: &str = row.get("metadata"); // will work if database cast json to string + let raw = raw.replace('\\', ""); + + // -- SELECT metadata->>'type' FROM fang_tasks ; this works because jsonb casting + let metadata: serde_json::Value = serde_json::from_str(&raw).unwrap(); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let error_message: Option = row.get("error_message"); + + let state_str: &str = row.get("state"); // will work if database cast json to string + + let state: FangTaskState = state_str.into(); + + let task_type: String = row.get("task_type"); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let uniq_hash: Option = row.get("uniq_hash"); + + let retries: i32 = row.get("retries"); + + let scheduled_at_str: &str = row.get("scheduled_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let scheduled_at: DateTime = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let created_at_str: &str = row.get("created_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let created_at: DateTime = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let updated_at_str: &str = row.get("updated_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let updated_at: DateTime = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + Ok(Task::builder() + .id(id) + .metadata(metadata) + .error_message(error_message) + .state(state) + .task_type(task_type) + .uniq_hash(uniq_hash) + .retries(retries) + .scheduled_at(scheduled_at) + .created_at(created_at) + .updated_at(updated_at) + .build()) + } +} + +impl FangQueryable for BackendSqlXPg {} impl BackendSqlXPg { - pub(super) async fn execute_query( + pub(super) async fn execute_query( query: SqlXQuery, - pool: &Pool, + pool: &Pool, params: QueryParams<'_>, ) -> Result { match query { Q::InsertTask => { - let task = - general_any_impl_insert_task(INSERT_TASK_QUERY_POSTGRES, pool, params).await?; + let task = >::insert_task( + INSERT_TASK_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::UpdateTaskState => { - let task = general_any_impl_update_task_state( + let task = >::update_task_state( UPDATE_TASK_STATE_QUERY_POSTGRES, pool, params, @@ -67,35 +131,37 @@ impl BackendSqlXPg { Ok(Res::Task(task)) } Q::FailTask => { - let task = - general_any_impl_fail_task(FAIL_TASK_QUERY_POSTGRES, pool, params).await?; + let task = >::fail_task( + FAIL_TASK_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RemoveAllTask => { - let affected_rows = - general_any_impl_remove_all_task(REMOVE_ALL_TASK_QUERY_POSTGRES, pool).await?; - - Ok(Res::Bigint(affected_rows)) - } - Q::RemoveAllScheduledTask => { - let affected_rows = general_any_impl_remove_all_scheduled_tasks( - REMOVE_ALL_SCHEDULED_TASK_QUERY_POSTGRES, + let affected_rows = >::remove_all_task( + REMOVE_ALL_TASK_QUERY_POSTGRES, pool, ) .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTask => { + Q::RemoveAllScheduledTask => { let affected_rows = - general_any_impl_remove_task(REMOVE_TASK_QUERY_POSTGRES, pool, params).await?; + >::remove_all_scheduled_tasks( + REMOVE_ALL_SCHEDULED_TASK_QUERY_POSTGRES, + pool, + ) + .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskByMetadata => { - let affected_rows = general_any_impl_remove_task_by_metadata( - REMOVE_TASK_BY_METADATA_QUERY_POSTGRES, + Q::RemoveTask => { + let affected_rows = >::remove_task( + REMOVE_TASK_QUERY_POSTGRES, pool, params, ) @@ -103,8 +169,19 @@ impl BackendSqlXPg { Ok(Res::Bigint(affected_rows)) } + Q::RemoveTaskByMetadata => { + let affected_rows = + >::remove_task_by_metadata( + REMOVE_TASK_BY_METADATA_QUERY_POSTGRES, + pool, + params, + ) + .await?; + + Ok(Res::Bigint(affected_rows)) + } Q::RemoveTaskType => { - let affected_rows = general_any_impl_remove_task_type( + let affected_rows = >::remove_task_type( REMOVE_TASKS_TYPE_QUERY_POSTGRES, pool, params, @@ -114,25 +191,35 @@ impl BackendSqlXPg { Ok(Res::Bigint(affected_rows)) } Q::FetchTaskType => { - let task = - general_any_impl_fetch_task_type(FETCH_TASK_TYPE_QUERY_POSTGRES, pool, params) - .await?; + let task = >::fetch_task_type( + FETCH_TASK_TYPE_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FindTaskById => { - let task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_POSTGRES, pool, params) - .await?; + let task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RetryTask => { - let task = - general_any_impl_retry_task(RETRY_TASK_QUERY_POSTGRES, pool, params).await?; + let task = >::retry_task( + RETRY_TASK_QUERY_POSTGRES, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::InsertTaskIfNotExists => { - let task = general_any_impl_insert_task_if_not_exists( + let task = >::insert_task_if_not_exists( ( FIND_TASK_BY_UNIQ_HASH_QUERY_POSTGRES, INSERT_TASK_UNIQ_QUERY_POSTGRES, diff --git a/fang/src/asynk/backend_sqlx/sqlite.rs b/fang/src/asynk/backend_sqlx/sqlite.rs index af37a34..430c960 100644 --- a/fang/src/asynk/backend_sqlx/sqlite.rs +++ b/fang/src/asynk/backend_sqlx/sqlite.rs @@ -20,39 +20,103 @@ const RETRY_TASK_QUERY_SQLITE: &str = include_str!("../queries_sqlite/retry_task #[derive(Debug, Clone)] pub(super) struct BackendSqlXSQLite {} +use super::FangQueryable; use super::{QueryParams, Res, SqlXQuery}; use crate::AsyncQueueError; -use sqlx::{Any, Pool}; +use crate::FangTaskState; +use crate::Task; +use chrono::{DateTime, Utc}; +use sqlx::sqlite::SqliteRow; +use sqlx::FromRow; +use sqlx::Pool; +use sqlx::Row; +use sqlx::Sqlite; +use uuid::Uuid; use SqlXQuery as Q; -use super::general_any_impl_fail_task; -use super::general_any_impl_fetch_task_type; -use super::general_any_impl_find_task_by_id; -use super::general_any_impl_insert_task; -use super::general_any_impl_insert_task_if_not_exists; -use super::general_any_impl_remove_all_scheduled_tasks; -use super::general_any_impl_remove_all_task; -use super::general_any_impl_remove_task; -use super::general_any_impl_remove_task_by_metadata; -use super::general_any_impl_remove_task_type; -use super::general_any_impl_retry_task; -use super::general_any_impl_update_task_state; +impl<'a> FromRow<'a, SqliteRow> for Task { + fn from_row(row: &'a SqliteRow) -> Result { + let uuid_as_text: &str = row.get("id"); + + let id = Uuid::parse_str(uuid_as_text).unwrap(); + + let raw: &str = row.get("metadata"); // will work if database cast json to string + let raw = raw.replace('\\', ""); + + // -- SELECT metadata->>'type' FROM fang_tasks ; this works because jsonb casting + let metadata: serde_json::Value = serde_json::from_str(&raw).unwrap(); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let error_message: Option = row.get("error_message"); + + let state_str: &str = row.get("state"); // will work if database cast json to string + + let state: FangTaskState = state_str.into(); + + let task_type: String = row.get("task_type"); + + // Be careful with this if we update sqlx, https://github.com/launchbadge/sqlx/issues/2416 + let uniq_hash: Option = row.get("uniq_hash"); + + let retries: i32 = row.get("retries"); + + let scheduled_at_str: &str = row.get("scheduled_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let scheduled_at: DateTime = DateTime::parse_from_str(scheduled_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let created_at_str: &str = row.get("created_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let created_at: DateTime = DateTime::parse_from_str(created_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + let updated_at_str: &str = row.get("updated_at"); + + // This unwrap is safe because we know that the database returns the date in the correct format + let updated_at: DateTime = DateTime::parse_from_str(updated_at_str, "%F %T%.f%#z") + .unwrap() + .into(); + + Ok(Task::builder() + .id(id) + .metadata(metadata) + .error_message(error_message) + .state(state) + .task_type(task_type) + .uniq_hash(uniq_hash) + .retries(retries) + .scheduled_at(scheduled_at) + .created_at(created_at) + .updated_at(updated_at) + .build()) + } +} + +impl FangQueryable for BackendSqlXSQLite {} impl BackendSqlXSQLite { pub(super) async fn execute_query( query: SqlXQuery, - pool: &Pool, + pool: &Pool, params: QueryParams<'_>, ) -> Result { match query { Q::InsertTask => { - let task = - general_any_impl_insert_task(INSERT_TASK_QUERY_SQLITE, pool, params).await?; + let task = >::insert_task( + INSERT_TASK_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::UpdateTaskState => { - let task = general_any_impl_update_task_state( + let task = >::update_task_state( UPDATE_TASK_STATE_QUERY_SQLITE, pool, params, @@ -61,34 +125,37 @@ impl BackendSqlXSQLite { Ok(Res::Task(task)) } Q::FailTask => { - let task = general_any_impl_fail_task(FAIL_TASK_QUERY_SQLITE, pool, params).await?; + let task = >::fail_task( + FAIL_TASK_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RemoveAllTask => { - let affected_rows = - general_any_impl_remove_all_task(REMOVE_ALL_TASK_QUERY_SQLITE, pool).await?; - - Ok(Res::Bigint(affected_rows)) - } - Q::RemoveAllScheduledTask => { - let affected_rows = general_any_impl_remove_all_scheduled_tasks( - REMOVE_ALL_SCHEDULED_TASK_QUERY_SQLITE, + let affected_rows = >::remove_all_task( + REMOVE_ALL_TASK_QUERY_SQLITE, pool, ) .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTask => { + Q::RemoveAllScheduledTask => { let affected_rows = - general_any_impl_remove_task(REMOVE_TASK_QUERY_SQLITE, pool, params).await?; + >::remove_all_scheduled_tasks( + REMOVE_ALL_SCHEDULED_TASK_QUERY_SQLITE, + pool, + ) + .await?; Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskByMetadata => { - let affected_rows = general_any_impl_remove_task_by_metadata( - REMOVE_TASK_BY_METADATA_QUERY_SQLITE, + Q::RemoveTask => { + let affected_rows = >::remove_task( + REMOVE_TASK_QUERY_SQLITE, pool, params, ) @@ -96,33 +163,57 @@ impl BackendSqlXSQLite { Ok(Res::Bigint(affected_rows)) } - Q::RemoveTaskType => { + Q::RemoveTaskByMetadata => { let affected_rows = - general_any_impl_remove_task_type(REMOVE_TASKS_TYPE_QUERY_SQLITE, pool, params) - .await?; + >::remove_task_by_metadata( + REMOVE_TASK_BY_METADATA_QUERY_SQLITE, + pool, + params, + ) + .await?; + + Ok(Res::Bigint(affected_rows)) + } + Q::RemoveTaskType => { + let affected_rows = >::remove_task_type( + REMOVE_TASKS_TYPE_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Bigint(affected_rows)) } Q::FetchTaskType => { - let task = - general_any_impl_fetch_task_type(FETCH_TASK_TYPE_QUERY_SQLITE, pool, params) - .await?; + let task = >::fetch_task_type( + FETCH_TASK_TYPE_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::FindTaskById => { - let task = - general_any_impl_find_task_by_id(FIND_TASK_BY_ID_QUERY_SQLITE, pool, params) - .await?; + let task = >::find_task_by_id( + FIND_TASK_BY_ID_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::RetryTask => { - let task = - general_any_impl_retry_task(RETRY_TASK_QUERY_SQLITE, pool, params).await?; + let task = >::retry_task( + RETRY_TASK_QUERY_SQLITE, + pool, + params, + ) + .await?; Ok(Res::Task(task)) } Q::InsertTaskIfNotExists => { - let task = general_any_impl_insert_task_if_not_exists( + let task = >::insert_task_if_not_exists( ( FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE, INSERT_TASK_UNIQ_QUERY_SQLITE,