Skip to content

Commit

Permalink
stupid mysql does not work anything , i hate it so much :/
Browse files Browse the repository at this point in the history
  • Loading branch information
pxp9 committed Sep 1, 2023
1 parent 5f1c601 commit 67bf9d0
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 19 deletions.
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ SQLITE_TESTS_DIR=tests_sqlite
HOST=127.0.0.1
POSTGRES_BASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${HOST}
POSTGRES_URL=${POSTGRES_BASE_URL}/${POSTGRES_DB}
MYSQL_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@${HOST}/${MYSQL_DB}
MYSQL_BASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@${HOST}
MYSQL_URL=${MYSQL_BASE_URL}/${MYSQL_DB}
DATABASE_URL=${POSTGRES_URL}
53 changes: 53 additions & 0 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,56 @@ impl AsyncQueue {
res.connect().await.expect("fail to connect");
res
}

/// Provides an AsyncQueue connected to its own DB
pub async fn test_mysql() -> Self {
dotenvy::dotenv().expect(".env file not found");
let base_url = env::var("MYSQL_BASE_URL").expect("Base URL for MySQL not found");
let base_db = env::var("MYSQL_DB").expect("Name for base MySQL DB not found");

let mut res = Self::builder()
.max_pool_size(1_u32)
.uri(format!("{}/{}", base_url, base_db))
.build();

let mut new_number = ASYNC_QUEUE_POSTGRES_TEST_COUNTER.lock().await;
res.connect().await.unwrap();

let db_name = format!("async_queue_test_{}", *new_number);
*new_number += 1;

let create_query: &str = &format!(
"CREATE DATABASE {}; CREATE TABLE {}.fang_tasks LIKE fang.fang_tasks;",
db_name, db_name
);

let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name);

let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap();

log::info!("Deleting database {db_name} ...");
conn.execute(delete_query).await.unwrap();

log::info!("Creating database {db_name} ...");
let expected_error: &str = &format!(
"source database \"{}\" is being accessed by other users",
base_db
);
while let Err(e) = conn.execute(create_query).await {
if e.as_database_error().unwrap().message() != expected_error {
panic!("{:?}", e);
}
}

log::info!("Database {db_name} created !!");

res.connected = false;
res.pool = None;
res.uri = format!("{}/{}", base_url, db_name);
res.connect().await.unwrap();

res
}
}

impl AsyncQueue {
Expand Down Expand Up @@ -860,3 +910,6 @@ impl AsyncQueueable for AsyncQueue {
test_asynk_queue! {postgres, crate::AsyncQueue, crate::AsyncQueue::test_postgres()}
#[cfg(test)]
test_asynk_queue! {sqlite, crate::AsyncQueue, crate::AsyncQueue::test_sqlite()}

#[cfg(test)]
test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()}
50 changes: 32 additions & 18 deletions fang/src/asynk/backend_sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ const FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE: &str =
const FIND_TASK_BY_ID_QUERY_SQLITE: &str = include_str!("queries_sqlite/find_task_by_id.sql");
const RETRY_TASK_QUERY_SQLITE: &str = include_str!("queries_sqlite/retry_task.sql");

const INSERT_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/insert_task.sql");
const INSERT_TASK_UNIQ_QUERY_MYSQL: &str = include_str!("queries_mysql/insert_task_uniq.sql");
const UPDATE_TASK_STATE_QUERY_MYSQL: &str = include_str!("queries_mysql/update_task_state.sql");
const FAIL_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/fail_task.sql");
const REMOVE_ALL_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/remove_all_tasks.sql");
const REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL: &str =
include_str!("queries_mysql/remove_all_scheduled_tasks.sql");
const REMOVE_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/remove_task.sql");
const REMOVE_TASK_BY_METADATA_QUERY_MYSQL: &str =
include_str!("queries_mysql/remove_task_by_metadata.sql");
const REMOVE_TASKS_TYPE_QUERY_MYSQL: &str = include_str!("queries_mysql/remove_tasks_type.sql");
const FETCH_TASK_TYPE_QUERY_MYSQL: &str = include_str!("queries_mysql/fetch_task_type.sql");
const FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL: &str =
include_str!("queries_mysql/find_task_by_uniq_hash.sql");
const FIND_TASK_BY_ID_QUERY_MYSQL: &str = include_str!("queries_mysql/find_task_by_id.sql");
const RETRY_TASK_QUERY_MYSQL: &str = include_str!("queries_mysql/retry_task.sql");

#[derive(Debug, Clone)]
pub(crate) enum BackendSqlX {
Pg,
Expand Down Expand Up @@ -151,24 +168,21 @@ struct BackendSqlXMySQL {}

impl BackendSqlXMySQL {
fn select_query(query: SqlXQuery) -> &'static str {
// TODO: MySQL queries
let _query = match query {
Q::InsertTask => INSERT_TASK_QUERY_SQLITE,
Q::InsertTaskUniq => INSERT_TASK_UNIQ_QUERY_SQLITE,
Q::UpdateTaskState => UPDATE_TASK_STATE_QUERY_SQLITE,
Q::FailTask => FAIL_TASK_QUERY_SQLITE,
Q::RemoveAllTask => REMOVE_ALL_TASK_QUERY_SQLITE,
Q::RemoveAllScheduledTask => REMOVE_ALL_SCHEDULED_TASK_QUERY_SQLITE,
Q::RemoveTask => REMOVE_TASK_QUERY_SQLITE,
Q::RemoveTaskByMetadata => REMOVE_TASK_BY_METADATA_QUERY_SQLITE,
Q::RemoveTaskType => REMOVE_TASKS_TYPE_QUERY_SQLITE,
Q::FetchTaskType => FETCH_TASK_TYPE_QUERY_SQLITE,
Q::FindTaskByUniqHash => FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE,
Q::FindTaskById => FIND_TASK_BY_ID_QUERY_SQLITE,
Q::RetryTask => RETRY_TASK_QUERY_SQLITE,
};

todo!()
match query {
Q::InsertTask => INSERT_TASK_QUERY_MYSQL,
Q::InsertTaskUniq => INSERT_TASK_UNIQ_QUERY_MYSQL,
Q::UpdateTaskState => UPDATE_TASK_STATE_QUERY_MYSQL,
Q::FailTask => FAIL_TASK_QUERY_MYSQL,
Q::RemoveAllTask => REMOVE_ALL_TASK_QUERY_MYSQL,
Q::RemoveAllScheduledTask => REMOVE_ALL_SCHEDULED_TASK_QUERY_MYSQL,
Q::RemoveTask => REMOVE_TASK_QUERY_MYSQL,
Q::RemoveTaskByMetadata => REMOVE_TASK_BY_METADATA_QUERY_MYSQL,
Q::RemoveTaskType => REMOVE_TASKS_TYPE_QUERY_MYSQL,
Q::FetchTaskType => FETCH_TASK_TYPE_QUERY_MYSQL,
Q::FindTaskByUniqHash => FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL,
Q::FindTaskById => FIND_TASK_BY_ID_QUERY_MYSQL,
Q::RetryTask => RETRY_TASK_QUERY_MYSQL,
}
}

fn _name() -> &'static str {
Expand Down
3 changes: 3 additions & 0 deletions fang/src/asynk/queries_mysql/fail_task.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE fang_tasks SET state = $1 , error_message = $2 , updated_at = $3 WHERE id = $4 ;

SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $4 ;
1 change: 1 addition & 0 deletions fang/src/asynk/queries_mysql/fetch_task_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT id , metadata , error_message, state, task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1
1 change: 1 addition & 0 deletions fang/src/asynk/queries_mysql/find_task_by_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1
1 change: 1 addition & 0 deletions fang/src/asynk/queries_mysql/find_task_by_uniq_hash.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1
7 changes: 7 additions & 0 deletions fang/src/asynk/queries_mysql/insert_task.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
BEGIN

INSERT INTO fang_tasks (id, metadata, task_type, scheduled_at) VALUES (?, ?, ?, ?);

SELECT * FROM fang_tasks WHERE id = 'uuid';

END
5 changes: 5 additions & 0 deletions fang/src/asynk/queries_mysql/insert_task_uniq.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
INSERT INTO fang_tasks ( id , metadata, task_type , uniq_hash, scheduled_at)
VALUES ($1, $2 , $3, $4, $5 ) ;


SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1 ;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DELETE FROM fang_tasks WHERE scheduled_at > $1
1 change: 1 addition & 0 deletions fang/src/asynk/queries_mysql/remove_all_tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DELETE FROM fang_tasks
1 change: 1 addition & 0 deletions fang/src/asynk/queries_mysql/remove_task.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DELETE FROM fang_tasks WHERE id = $1
1 change: 1 addition & 0 deletions fang/src/asynk/queries_mysql/remove_task_by_metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DELETE FROM fang_tasks WHERE uniq_hash = $1
1 change: 1 addition & 0 deletions fang/src/asynk/queries_mysql/remove_tasks_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DELETE FROM fang_tasks WHERE task_type = $1
3 changes: 3 additions & 0 deletions fang/src/asynk/queries_mysql/retry_task.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE fang_tasks SET state = 'retried' , error_message = $1, retries = $2, scheduled_at = $3, updated_at = $4 WHERE id = $5;

SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $5 ;
5 changes: 5 additions & 0 deletions fang/src/asynk/queries_mysql/update_task_state.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN

UPDATE fang_tasks SET state = $1 , updated_at = $2 WHERE id = $3;

END

0 comments on commit 67bf9d0

Please sign in to comment.