From 67bf9d01fb1ab0bbf734815ac44c59f6771a0707 Mon Sep 17 00:00:00 2001 From: pxp9 Date: Fri, 1 Sep 2023 17:09:23 +0200 Subject: [PATCH] stupid mysql does not work anything , i hate it so much :/ --- .env | 3 +- fang/src/asynk/async_queue.rs | 53 +++++++++++++++++++ fang/src/asynk/backend_sqlx.rs | 50 ++++++++++------- fang/src/asynk/queries_mysql/fail_task.sql | 3 ++ .../asynk/queries_mysql/fetch_task_type.sql | 1 + .../asynk/queries_mysql/find_task_by_id.sql | 1 + .../queries_mysql/find_task_by_uniq_hash.sql | 1 + fang/src/asynk/queries_mysql/insert_task.sql | 7 +++ .../asynk/queries_mysql/insert_task_uniq.sql | 5 ++ .../remove_all_scheduled_tasks.sql | 1 + .../asynk/queries_mysql/remove_all_tasks.sql | 1 + fang/src/asynk/queries_mysql/remove_task.sql | 1 + .../queries_mysql/remove_task_by_metadata.sql | 1 + .../asynk/queries_mysql/remove_tasks_type.sql | 1 + fang/src/asynk/queries_mysql/retry_task.sql | 3 ++ .../asynk/queries_mysql/update_task_state.sql | 5 ++ 16 files changed, 118 insertions(+), 19 deletions(-) create mode 100644 fang/src/asynk/queries_mysql/fail_task.sql create mode 100644 fang/src/asynk/queries_mysql/fetch_task_type.sql create mode 100644 fang/src/asynk/queries_mysql/find_task_by_id.sql create mode 100644 fang/src/asynk/queries_mysql/find_task_by_uniq_hash.sql create mode 100644 fang/src/asynk/queries_mysql/insert_task.sql create mode 100644 fang/src/asynk/queries_mysql/insert_task_uniq.sql create mode 100644 fang/src/asynk/queries_mysql/remove_all_scheduled_tasks.sql create mode 100644 fang/src/asynk/queries_mysql/remove_all_tasks.sql create mode 100644 fang/src/asynk/queries_mysql/remove_task.sql create mode 100644 fang/src/asynk/queries_mysql/remove_task_by_metadata.sql create mode 100644 fang/src/asynk/queries_mysql/remove_tasks_type.sql create mode 100644 fang/src/asynk/queries_mysql/retry_task.sql create mode 100644 fang/src/asynk/queries_mysql/update_task_state.sql diff --git a/.env b/.env index df954bbb..91c5e820 100644 --- a/.env +++ b/.env @@ -25,5 +25,6 @@ SQLITE_TESTS_DIR=tests_sqlite HOST=127.0.0.1 POSTGRES_BASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${HOST} POSTGRES_URL=${POSTGRES_BASE_URL}/${POSTGRES_DB} -MYSQL_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@${HOST}/${MYSQL_DB} +MYSQL_BASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@${HOST} +MYSQL_URL=${MYSQL_BASE_URL}/${MYSQL_DB} DATABASE_URL=${POSTGRES_URL} diff --git a/fang/src/asynk/async_queue.rs b/fang/src/asynk/async_queue.rs index e39a25a0..aeede499 100644 --- a/fang/src/asynk/async_queue.rs +++ b/fang/src/asynk/async_queue.rs @@ -250,6 +250,56 @@ impl AsyncQueue { res.connect().await.expect("fail to connect"); res } + + /// Provides an AsyncQueue connected to its own DB + pub async fn test_mysql() -> Self { + dotenvy::dotenv().expect(".env file not found"); + let base_url = env::var("MYSQL_BASE_URL").expect("Base URL for MySQL not found"); + let base_db = env::var("MYSQL_DB").expect("Name for base MySQL DB not found"); + + let mut res = Self::builder() + .max_pool_size(1_u32) + .uri(format!("{}/{}", base_url, base_db)) + .build(); + + let mut new_number = ASYNC_QUEUE_POSTGRES_TEST_COUNTER.lock().await; + res.connect().await.unwrap(); + + let db_name = format!("async_queue_test_{}", *new_number); + *new_number += 1; + + let create_query: &str = &format!( + "CREATE DATABASE {}; CREATE TABLE {}.fang_tasks LIKE fang.fang_tasks;", + db_name, db_name + ); + + let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name); + + let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap(); + + log::info!("Deleting database {db_name} ..."); + conn.execute(delete_query).await.unwrap(); + + log::info!("Creating database {db_name} ..."); + let expected_error: &str = &format!( + "source database \"{}\" is being accessed by other users", + base_db + ); + while let Err(e) = conn.execute(create_query).await { + if e.as_database_error().unwrap().message() != expected_error { + panic!("{:?}", e); + } + } + + log::info!("Database {db_name} created !!"); + + res.connected = false; + res.pool = None; + res.uri = format!("{}/{}", base_url, db_name); + res.connect().await.unwrap(); + + res + } } impl AsyncQueue { @@ -860,3 +910,6 @@ impl AsyncQueueable for AsyncQueue { test_asynk_queue! {postgres, crate::AsyncQueue, crate::AsyncQueue::test_postgres()} #[cfg(test)] test_asynk_queue! {sqlite, crate::AsyncQueue, crate::AsyncQueue::test_sqlite()} + +#[cfg(test)] +test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()} diff --git a/fang/src/asynk/backend_sqlx.rs b/fang/src/asynk/backend_sqlx.rs index 8734eb0f..c0abecb7 100644 --- a/fang/src/asynk/backend_sqlx.rs +++ b/fang/src/asynk/backend_sqlx.rs @@ -36,6 +36,23 @@ const FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE: &str = const FIND_TASK_BY_ID_QUERY_SQLITE: &str = include_str!("queries_sqlite/find_task_by_id.sql"); const RETRY_TASK_QUERY_SQLITE: &str = include_str!("queries_sqlite/retry_task.sql"); +const INSERT_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/insert_task.sql"); +const INSERT_TASK_UNIQ_QUERY_MYSQL: &str = include_str!("queries_mysql/insert_task_uniq.sql"); +const UPDATE_TASK_STATE_QUERY_MYSQL: &str = include_str!("queries_mysql/update_task_state.sql"); +const FAIL_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/fail_task.sql"); +const REMOVE_ALL_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/remove_all_tasks.sql"); +const REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL: &str = + include_str!("queries_mysql/remove_all_scheduled_tasks.sql"); +const REMOVE_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/remove_task.sql"); +const REMOVE_TASK_BY_METADATA_QUERY_MYSQL: &str = + include_str!("queries_mysql/remove_task_by_metadata.sql"); +const REMOVE_TASKS_TYPE_QUERY_MYSQL: &str = include_str!("queries_mysql/remove_tasks_type.sql"); +const FETCH_TASK_TYPE_QUERY_MYSQL: &str = include_str!("queries_mysql/fetch_task_type.sql"); +const FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL: &str = + include_str!("queries_mysql/find_task_by_uniq_hash.sql"); +const FIND_TASK_BY_ID_QUERY_MYSQL: &str = include_str!("queries_mysql/find_task_by_id.sql"); +const RETRY_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/retry_task.sql"); + #[derive(Debug, Clone)] pub(crate) enum BackendSqlX { Pg, @@ -151,24 +168,21 @@ struct BackendSqlXMySQL {} impl BackendSqlXMySQL { fn select_query(query: SqlXQuery) -> &'static str { - // TODO: MySQL queries - let _query = match query { - Q::InsertTask => INSERT_TASK_QUERY_SQLITE, - Q::InsertTaskUniq => INSERT_TASK_UNIQ_QUERY_SQLITE, - Q::UpdateTaskState => UPDATE_TASK_STATE_QUERY_SQLITE, - Q::FailTask => FAIL_TASK_QUERY_SQLITE, - Q::RemoveAllTask => REMOVE_ALL_TASK_QUERY_SQLITE, - Q::RemoveAllScheduledTask => REMOVE_ALL_SCHEDULED_TASK_QUERY_SQLITE, - Q::RemoveTask => REMOVE_TASK_QUERY_SQLITE, - Q::RemoveTaskByMetadata => REMOVE_TASK_BY_METADATA_QUERY_SQLITE, - Q::RemoveTaskType => REMOVE_TASKS_TYPE_QUERY_SQLITE, - Q::FetchTaskType => FETCH_TASK_TYPE_QUERY_SQLITE, - Q::FindTaskByUniqHash => FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE, - Q::FindTaskById => FIND_TASK_BY_ID_QUERY_SQLITE, - Q::RetryTask => RETRY_TASK_QUERY_SQLITE, - }; - - todo!() + match query { + Q::InsertTask => INSERT_TASK_QUERY_MYSQL, + Q::InsertTaskUniq => INSERT_TASK_UNIQ_QUERY_MYSQL, + Q::UpdateTaskState => UPDATE_TASK_STATE_QUERY_MYSQL, + Q::FailTask => FAIL_TASK_QUERY_MYSQL, + Q::RemoveAllTask => REMOVE_ALL_TASK_QUERY_MYSQL, + Q::RemoveAllScheduledTask => REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL, + Q::RemoveTask => REMOVE_TASK_QUERY_MYSQL, + Q::RemoveTaskByMetadata => REMOVE_TASK_BY_METADATA_QUERY_MYSQL, + Q::RemoveTaskType => REMOVE_TASKS_TYPE_QUERY_MYSQL, + Q::FetchTaskType => FETCH_TASK_TYPE_QUERY_MYSQL, + Q::FindTaskByUniqHash => FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL, + Q::FindTaskById => FIND_TASK_BY_ID_QUERY_MYSQL, + Q::RetryTask => RETRY_TASK_QUERY_MYSQL, + } } fn _name() -> &'static str { diff --git a/fang/src/asynk/queries_mysql/fail_task.sql b/fang/src/asynk/queries_mysql/fail_task.sql new file mode 100644 index 00000000..b89d13d1 --- /dev/null +++ b/fang/src/asynk/queries_mysql/fail_task.sql @@ -0,0 +1,3 @@ +UPDATE fang_tasks SET state = $1 , error_message = $2 , updated_at = $3 WHERE id = $4 ; + +SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $4 ; diff --git a/fang/src/asynk/queries_mysql/fetch_task_type.sql b/fang/src/asynk/queries_mysql/fetch_task_type.sql new file mode 100644 index 00000000..02c3f9f4 --- /dev/null +++ b/fang/src/asynk/queries_mysql/fetch_task_type.sql @@ -0,0 +1 @@ +SELECT id , metadata , error_message, state, task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 diff --git a/fang/src/asynk/queries_mysql/find_task_by_id.sql b/fang/src/asynk/queries_mysql/find_task_by_id.sql new file mode 100644 index 00000000..60b4cf93 --- /dev/null +++ b/fang/src/asynk/queries_mysql/find_task_by_id.sql @@ -0,0 +1 @@ +SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1 diff --git a/fang/src/asynk/queries_mysql/find_task_by_uniq_hash.sql b/fang/src/asynk/queries_mysql/find_task_by_uniq_hash.sql new file mode 100644 index 00000000..d12443ad --- /dev/null +++ b/fang/src/asynk/queries_mysql/find_task_by_uniq_hash.sql @@ -0,0 +1 @@ +SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1 diff --git a/fang/src/asynk/queries_mysql/insert_task.sql b/fang/src/asynk/queries_mysql/insert_task.sql new file mode 100644 index 00000000..6f0c2bba --- /dev/null +++ b/fang/src/asynk/queries_mysql/insert_task.sql @@ -0,0 +1,7 @@ +BEGIN + +INSERT INTO fang_tasks (id, metadata, task_type, scheduled_at) VALUES (?, ?, ?, ?); + +SELECT * FROM fang_tasks WHERE id = 'uuid'; + +END \ No newline at end of file diff --git a/fang/src/asynk/queries_mysql/insert_task_uniq.sql b/fang/src/asynk/queries_mysql/insert_task_uniq.sql new file mode 100644 index 00000000..dbbe6d73 --- /dev/null +++ b/fang/src/asynk/queries_mysql/insert_task_uniq.sql @@ -0,0 +1,5 @@ +INSERT INTO fang_tasks ( id , metadata, task_type , uniq_hash, scheduled_at) +VALUES ($1, $2 , $3, $4, $5 ) ; + + +SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1 ; diff --git a/fang/src/asynk/queries_mysql/remove_all_scheduled_tasks.sql b/fang/src/asynk/queries_mysql/remove_all_scheduled_tasks.sql new file mode 100644 index 00000000..db72fce8 --- /dev/null +++ b/fang/src/asynk/queries_mysql/remove_all_scheduled_tasks.sql @@ -0,0 +1 @@ +DELETE FROM fang_tasks WHERE scheduled_at > $1 diff --git a/fang/src/asynk/queries_mysql/remove_all_tasks.sql b/fang/src/asynk/queries_mysql/remove_all_tasks.sql new file mode 100644 index 00000000..4da949ca --- /dev/null +++ b/fang/src/asynk/queries_mysql/remove_all_tasks.sql @@ -0,0 +1 @@ +DELETE FROM fang_tasks diff --git a/fang/src/asynk/queries_mysql/remove_task.sql b/fang/src/asynk/queries_mysql/remove_task.sql new file mode 100644 index 00000000..4a384bd7 --- /dev/null +++ b/fang/src/asynk/queries_mysql/remove_task.sql @@ -0,0 +1 @@ +DELETE FROM fang_tasks WHERE id = $1 diff --git a/fang/src/asynk/queries_mysql/remove_task_by_metadata.sql b/fang/src/asynk/queries_mysql/remove_task_by_metadata.sql new file mode 100644 index 00000000..85cb4eea --- /dev/null +++ b/fang/src/asynk/queries_mysql/remove_task_by_metadata.sql @@ -0,0 +1 @@ +DELETE FROM fang_tasks WHERE uniq_hash = $1 diff --git a/fang/src/asynk/queries_mysql/remove_tasks_type.sql b/fang/src/asynk/queries_mysql/remove_tasks_type.sql new file mode 100644 index 00000000..a12477fc --- /dev/null +++ b/fang/src/asynk/queries_mysql/remove_tasks_type.sql @@ -0,0 +1 @@ +DELETE FROM fang_tasks WHERE task_type = $1 diff --git a/fang/src/asynk/queries_mysql/retry_task.sql b/fang/src/asynk/queries_mysql/retry_task.sql new file mode 100644 index 00000000..4be5ea8f --- /dev/null +++ b/fang/src/asynk/queries_mysql/retry_task.sql @@ -0,0 +1,3 @@ +UPDATE fang_tasks SET state = 'retried' , error_message = $1, retries = $2, scheduled_at = $3, updated_at = $4 WHERE id = $5; + +SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $5 ; \ No newline at end of file diff --git a/fang/src/asynk/queries_mysql/update_task_state.sql b/fang/src/asynk/queries_mysql/update_task_state.sql new file mode 100644 index 00000000..eefb3c23 --- /dev/null +++ b/fang/src/asynk/queries_mysql/update_task_state.sql @@ -0,0 +1,5 @@ +BEGIN + +UPDATE fang_tasks SET state = $1 , updated_at = $2 WHERE id = $3; + +END \ No newline at end of file