From 16e94ef60ce90c8e9b26576150815c9ba8d7615d Mon Sep 17 00:00:00 2001 From: pxp9 Date: Sat, 2 Sep 2023 12:09:51 +0200 Subject: [PATCH] debugging issue : the issue is related to how a uniq task is inserted in MySQL backend --- .../up.sql | 2 +- fang/src/asynk/async_queue.rs | 43 +----- fang/src/asynk/backend_sqlx.rs | 136 +++++++++--------- .../asynk/queries_mysql/insert_task_uniq.sql | 6 +- .../queries_mysql/remove_task_by_metadata.sql | 2 +- fang/src/blocking/mysql_schema.rs | 2 +- 6 files changed, 78 insertions(+), 113 deletions(-) diff --git a/fang/mysql_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql b/fang/mysql_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql index 4cd858d6..4fd52060 100644 --- a/fang/mysql_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql +++ b/fang/mysql_migrations/migrations/2023-08-17-102017_create_fang_tasks/up.sql @@ -15,7 +15,7 @@ CREATE TABLE fang_tasks ( error_message VARCHAR(2048), state ENUM('new', 'in_progress', 'failed', 'finished', 'retried') NOT NULL DEFAULT 'new', task_type VARCHAR(255) NOT NULL DEFAULT 'common', -- TEXT type can not have default value, stupid MySQL policy - uniq_hash CHAR(64), + uniq_hash VARCHAR(64), retries INTEGER NOT NULL DEFAULT 0, scheduled_at VARCHAR(32) NOT NULL DEFAULT(CONCAT(CURRENT_TIMESTAMP, '.000000000+00')), created_at VARCHAR(32) NOT NULL DEFAULT (CONCAT(CURRENT_TIMESTAMP , '.000000000+00')), diff --git a/fang/src/asynk/async_queue.rs b/fang/src/asynk/async_queue.rs index d3701b7b..472eddc5 100644 --- a/fang/src/asynk/async_queue.rs +++ b/fang/src/asynk/async_queue.rs @@ -389,7 +389,7 @@ impl AsyncQueue { Ok(task) } - async fn insert_task_uniq_query( + async fn insert_task_if_not_exist_query( transaction: &mut Transaction<'_, Any>, backend: &BackendSqlX, metadata: &serde_json::Value, @@ -397,52 +397,17 @@ impl AsyncQueue { scheduled_at: &DateTime, ) -> Result { let query_params = QueryParams::builder() - .metadata(&metadata) + .metadata(metadata) .task_type(task_type) .scheduled_at(scheduled_at) .build(); let task = backend - .execute_query(SqlXQuery::InsertTaskUniq, transaction, query_params) + .execute_query(SqlXQuery::InsertTaskIfNotExists, transaction, query_params) .await? .unwrap_task(); - Ok(task) - } - async fn insert_task_if_not_exist_query( - transaction: &mut Transaction<'_, Any>, - backend: &BackendSqlX, - metadata: &serde_json::Value, - task_type: &str, - scheduled_at: &DateTime, - ) -> Result { - match Self::find_task_by_uniq_hash_query(transaction, backend, &metadata).await { - Some(task) => Ok(task), - None => { - Self::insert_task_uniq_query( - transaction, - backend, - metadata, - task_type, - scheduled_at, - ) - .await - } - } - } - - async fn find_task_by_uniq_hash_query( - transaction: &mut Transaction<'_, Any>, - backend: &BackendSqlX, - metadata: &serde_json::Value, - ) -> Option { - let query_params = QueryParams::builder().metadata(metadata).build(); - - backend - .execute_query(SqlXQuery::FindTaskByUniqHash, transaction, query_params) - .await - .ok()? - .unwrap_opt_task() + Ok(task) } async fn schedule_task_query( diff --git a/fang/src/asynk/backend_sqlx.rs b/fang/src/asynk/backend_sqlx.rs index e86d42e3..06dabd2a 100644 --- a/fang/src/asynk/backend_sqlx.rs +++ b/fang/src/asynk/backend_sqlx.rs @@ -94,7 +94,6 @@ pub(crate) struct QueryParams<'a> { pub(crate) enum Res { BIGINT(u64), Task(Task), - OptTask(Option), } impl Res { @@ -111,13 +110,6 @@ impl Res { _ => panic!("Can not unwrap a task"), } } - - pub(crate) fn unwrap_opt_task(self) -> Option { - match self { - Res::OptTask(opt_task) => opt_task, - _ => panic!("Can not unwrap a opt_task"), - } - } } impl BackendSqlX { @@ -160,7 +152,6 @@ impl BackendSqlX { #[derive(Debug, Clone)] pub(crate) enum SqlXQuery { InsertTask, - InsertTaskUniq, UpdateTaskState, FailTask, RemoveAllTask, @@ -169,9 +160,9 @@ pub(crate) enum SqlXQuery { RemoveTaskByMetadata, RemoveTaskType, FetchTaskType, - FindTaskByUniqHash, FindTaskById, RetryTask, + InsertTaskIfNotExists, } #[derive(Debug, Clone)] @@ -196,15 +187,6 @@ impl BackendSqlXPg { Ok(Res::Task(task)) } - Q::InsertTaskUniq => { - let task = general_any_impl_insert_task_uniq( - INSERT_TASK_UNIQ_QUERY_POSTGRES, - transaction, - params, - ) - .await?; - Ok(Res::Task(task)) - } Q::UpdateTaskState => { let task = general_any_impl_update_task_state( UPDATE_TASK_STATE_QUERY_POSTGRES, @@ -273,16 +255,6 @@ impl BackendSqlXPg { .await?; Ok(Res::Task(task)) } - Q::FindTaskByUniqHash => { - let opt_task: Option = general_any_impl_find_task_by_uniq_hash( - FIND_TASK_BY_UNIQ_HASH_QUERY_POSTGRES, - transaction, - params, - ) - .await; - - Ok(Res::OptTask(opt_task)) - } Q::FindTaskById => { let task = general_any_impl_find_task_by_id( FIND_TASK_BY_ID_QUERY_POSTGRES, @@ -297,6 +269,19 @@ impl BackendSqlXPg { general_any_impl_retry_task(RETRY_TASK_QUERY_POSTGRES, transaction, params) .await?; + Ok(Res::Task(task)) + } + Q::InsertTaskIfNotExists => { + let task = general_any_impl_insert_task_if_not_exists( + ( + FIND_TASK_BY_UNIQ_HASH_QUERY_POSTGRES, + INSERT_TASK_UNIQ_QUERY_POSTGRES, + ), + transaction, + params, + ) + .await?; + Ok(Res::Task(task)) } } @@ -324,15 +309,6 @@ impl BackendSqlXSQLite { Ok(Res::Task(task)) } - Q::InsertTaskUniq => { - let task = general_any_impl_insert_task_uniq( - INSERT_TASK_UNIQ_QUERY_SQLITE, - transaction, - params, - ) - .await?; - Ok(Res::Task(task)) - } Q::UpdateTaskState => { let task = general_any_impl_update_task_state( UPDATE_TASK_STATE_QUERY_SQLITE, @@ -400,16 +376,6 @@ impl BackendSqlXSQLite { .await?; Ok(Res::Task(task)) } - Q::FindTaskByUniqHash => { - let opt_task: Option = general_any_impl_find_task_by_uniq_hash( - FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE, - transaction, - params, - ) - .await; - - Ok(Res::OptTask(opt_task)) - } Q::FindTaskById => { let task = general_any_impl_find_task_by_id( FIND_TASK_BY_ID_QUERY_SQLITE, @@ -424,6 +390,19 @@ impl BackendSqlXSQLite { general_any_impl_retry_task(RETRY_TASK_QUERY_SQLITE, transaction, params) .await?; + Ok(Res::Task(task)) + } + Q::InsertTaskIfNotExists => { + let task = general_any_impl_insert_task_if_not_exists( + ( + FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE, + INSERT_TASK_UNIQ_QUERY_SQLITE, + ), + transaction, + params, + ) + .await?; + Ok(Res::Task(task)) } } @@ -434,6 +413,17 @@ impl BackendSqlXSQLite { } } +async fn general_any_impl_insert_task_if_not_exists( + queries: (&str, &str), + transaction: &mut Transaction<'_, Any>, + params: QueryParams<'_>, +) -> Result { + match general_any_impl_find_task_by_uniq_hash(queries.0, transaction, ¶ms).await { + Some(task) => Ok(task), + None => general_any_impl_insert_task_uniq(queries.1, transaction, params).await, + } +} + async fn general_any_impl_insert_task( query: &str, transaction: &mut Transaction<'_, Any>, @@ -604,6 +594,8 @@ async fn general_any_impl_remove_task_by_metadata( let uniq_hash = calculate_hash(&metadata.to_string()); + println!("{query}"); + Ok(sqlx::query(query) .bind(uniq_hash) .execute(transaction.acquire().await?) @@ -646,7 +638,7 @@ async fn general_any_impl_fetch_task_type( async fn general_any_impl_find_task_by_uniq_hash( query: &str, transaction: &mut Transaction<'_, Any>, - params: QueryParams<'_>, + params: &QueryParams<'_>, ) -> Option { let metadata = params.metadata.unwrap(); @@ -729,13 +721,6 @@ impl BackendSqlXMySQL { Ok(Res::Task(task)) } - Q::InsertTaskUniq => { - let task = - mysql_impl_insert_task_uniq(INSERT_TASK_UNIQ_QUERY_MYSQL, transaction, params) - .await?; - Ok(Res::Task(task)) - } - Q::UpdateTaskState => { let task = mysql_impl_update_task_state( UPDATE_TASK_STATE_QUERY_MYSQL, @@ -806,16 +791,6 @@ impl BackendSqlXMySQL { .await?; Ok(Res::Task(task)) } - Q::FindTaskByUniqHash => { - let opt_task: Option = general_any_impl_find_task_by_uniq_hash( - FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL, - transaction, - params, - ) - .await; - - Ok(Res::OptTask(opt_task)) - } Q::FindTaskById => { let task: Task = general_any_impl_find_task_by_id( FIND_TASK_BY_ID_QUERY_MYSQL, @@ -830,6 +805,19 @@ impl BackendSqlXMySQL { let task = mysql_impl_retry_task(RETRY_TASK_QUERY_MYSQL, transaction, params).await?; + Ok(Res::Task(task)) + } + Q::InsertTaskIfNotExists => { + let task = mysql_any_impl_insert_task_if_not_exists( + ( + FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL, + INSERT_TASK_UNIQ_QUERY_MYSQL, + ), + transaction, + params, + ) + .await?; + Ok(Res::Task(task)) } } @@ -895,6 +883,10 @@ async fn mysql_impl_insert_task_uniq( let uniq_hash = calculate_hash(&metadata_str); + println!("{} len : {}", uniq_hash, uniq_hash.len()); + + println!("reach here"); + let affected_rows = sqlx::query(query) .bind(uuid_as_str) .bind(metadata_str) @@ -904,6 +896,7 @@ async fn mysql_impl_insert_task_uniq( .execute(transaction.acquire().await?) .await? .rows_affected(); + println!("reach here 2"); if affected_rows != 1 { // here we should return an error @@ -1034,3 +1027,14 @@ async fn mysql_impl_retry_task( Ok(failed_task) } + +async fn mysql_any_impl_insert_task_if_not_exists( + queries: (&str, &str), + transaction: &mut Transaction<'_, Any>, + params: QueryParams<'_>, +) -> Result { + match general_any_impl_find_task_by_uniq_hash(queries.0, transaction, ¶ms).await { + Some(task) => Ok(task), + None => mysql_impl_insert_task_uniq(queries.1, transaction, params).await, + } +} diff --git a/fang/src/asynk/queries_mysql/insert_task_uniq.sql b/fang/src/asynk/queries_mysql/insert_task_uniq.sql index dbbe6d73..000a3d7e 100644 --- a/fang/src/asynk/queries_mysql/insert_task_uniq.sql +++ b/fang/src/asynk/queries_mysql/insert_task_uniq.sql @@ -1,5 +1 @@ -INSERT INTO fang_tasks ( id , metadata, task_type , uniq_hash, scheduled_at) -VALUES ($1, $2 , $3, $4, $5 ) ; - - -SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1 ; +INSERT INTO fang_tasks(id,metadata,task_type,uniq_hash,scheduled_at) VALUES (?, ? , ?, ?, ?) ; diff --git a/fang/src/asynk/queries_mysql/remove_task_by_metadata.sql b/fang/src/asynk/queries_mysql/remove_task_by_metadata.sql index 966ab747..f8474e89 100644 --- a/fang/src/asynk/queries_mysql/remove_task_by_metadata.sql +++ b/fang/src/asynk/queries_mysql/remove_task_by_metadata.sql @@ -1 +1 @@ -DELETE FROM fang_tasks WHERE uniq_hash = ? +DELETE FROM fang_tasks WHERE uniq_hash = ? ; diff --git a/fang/src/blocking/mysql_schema.rs b/fang/src/blocking/mysql_schema.rs index 8445e156..d00b1a4f 100644 --- a/fang/src/blocking/mysql_schema.rs +++ b/fang/src/blocking/mysql_schema.rs @@ -22,7 +22,7 @@ diesel::table! { #[max_length = 255] task_type -> Varchar, #[max_length = 64] - uniq_hash -> Nullable, + uniq_hash -> Nullable, retries -> Integer, #[max_length = 32] scheduled_at -> Varchar,