Skip to content

Commit

Permalink
debugging issue : the issue is related to how a uniq task is inserted…
Browse files Browse the repository at this point in the history
… in MySQL backend
  • Loading branch information
pxp9 committed Sep 2, 2023
1 parent fa4f135 commit 16e94ef
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CREATE TABLE fang_tasks (
error_message VARCHAR(2048),
state ENUM('new', 'in_progress', 'failed', 'finished', 'retried') NOT NULL DEFAULT 'new',
task_type VARCHAR(255) NOT NULL DEFAULT 'common', -- TEXT type can not have default value, stupid MySQL policy
uniq_hash CHAR(64),
uniq_hash VARCHAR(64),
retries INTEGER NOT NULL DEFAULT 0,
scheduled_at VARCHAR(32) NOT NULL DEFAULT(CONCAT(CURRENT_TIMESTAMP, '.000000000+00')),
created_at VARCHAR(32) NOT NULL DEFAULT (CONCAT(CURRENT_TIMESTAMP , '.000000000+00')),
Expand Down
43 changes: 4 additions & 39 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,60 +389,25 @@ impl AsyncQueue {
Ok(task)
}

async fn insert_task_uniq_query(
async fn insert_task_if_not_exist_query(
transaction: &mut Transaction<'_, Any>,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
scheduled_at: &DateTime<Utc>,
) -> Result<Task, AsyncQueueError> {
let query_params = QueryParams::builder()
.metadata(&metadata)
.metadata(metadata)
.task_type(task_type)
.scheduled_at(scheduled_at)
.build();

let task = backend
.execute_query(SqlXQuery::InsertTaskUniq, transaction, query_params)
.execute_query(SqlXQuery::InsertTaskIfNotExists, transaction, query_params)
.await?
.unwrap_task();
Ok(task)
}

async fn insert_task_if_not_exist_query(
transaction: &mut Transaction<'_, Any>,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
scheduled_at: &DateTime<Utc>,
) -> Result<Task, AsyncQueueError> {
match Self::find_task_by_uniq_hash_query(transaction, backend, &metadata).await {
Some(task) => Ok(task),
None => {
Self::insert_task_uniq_query(
transaction,
backend,
metadata,
task_type,
scheduled_at,
)
.await
}
}
}

async fn find_task_by_uniq_hash_query(
transaction: &mut Transaction<'_, Any>,
backend: &BackendSqlX,
metadata: &serde_json::Value,
) -> Option<Task> {
let query_params = QueryParams::builder().metadata(metadata).build();

backend
.execute_query(SqlXQuery::FindTaskByUniqHash, transaction, query_params)
.await
.ok()?
.unwrap_opt_task()
Ok(task)
}

async fn schedule_task_query(
Expand Down
136 changes: 70 additions & 66 deletions fang/src/asynk/backend_sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ pub(crate) struct QueryParams<'a> {
pub(crate) enum Res {
BIGINT(u64),
Task(Task),
OptTask(Option<Task>),
}

impl Res {
Expand All @@ -111,13 +110,6 @@ impl Res {
_ => panic!("Can not unwrap a task"),
}
}

pub(crate) fn unwrap_opt_task(self) -> Option<Task> {
match self {
Res::OptTask(opt_task) => opt_task,
_ => panic!("Can not unwrap a opt_task"),
}
}
}

impl BackendSqlX {
Expand Down Expand Up @@ -160,7 +152,6 @@ impl BackendSqlX {
#[derive(Debug, Clone)]
pub(crate) enum SqlXQuery {
InsertTask,
InsertTaskUniq,
UpdateTaskState,
FailTask,
RemoveAllTask,
Expand All @@ -169,9 +160,9 @@ pub(crate) enum SqlXQuery {
RemoveTaskByMetadata,
RemoveTaskType,
FetchTaskType,
FindTaskByUniqHash,
FindTaskById,
RetryTask,
InsertTaskIfNotExists,
}

#[derive(Debug, Clone)]
Expand All @@ -196,15 +187,6 @@ impl BackendSqlXPg {

Ok(Res::Task(task))
}
Q::InsertTaskUniq => {
let task = general_any_impl_insert_task_uniq(
INSERT_TASK_UNIQ_QUERY_POSTGRES,
transaction,
params,
)
.await?;
Ok(Res::Task(task))
}
Q::UpdateTaskState => {
let task = general_any_impl_update_task_state(
UPDATE_TASK_STATE_QUERY_POSTGRES,
Expand Down Expand Up @@ -273,16 +255,6 @@ impl BackendSqlXPg {
.await?;
Ok(Res::Task(task))
}
Q::FindTaskByUniqHash => {
let opt_task: Option<Task> = general_any_impl_find_task_by_uniq_hash(
FIND_TASK_BY_UNIQ_HASH_QUERY_POSTGRES,
transaction,
params,
)
.await;

Ok(Res::OptTask(opt_task))
}
Q::FindTaskById => {
let task = general_any_impl_find_task_by_id(
FIND_TASK_BY_ID_QUERY_POSTGRES,
Expand All @@ -297,6 +269,19 @@ impl BackendSqlXPg {
general_any_impl_retry_task(RETRY_TASK_QUERY_POSTGRES, transaction, params)
.await?;

Ok(Res::Task(task))
}
Q::InsertTaskIfNotExists => {
let task = general_any_impl_insert_task_if_not_exists(
(
FIND_TASK_BY_UNIQ_HASH_QUERY_POSTGRES,
INSERT_TASK_UNIQ_QUERY_POSTGRES,
),
transaction,
params,
)
.await?;

Ok(Res::Task(task))
}
}
Expand Down Expand Up @@ -324,15 +309,6 @@ impl BackendSqlXSQLite {

Ok(Res::Task(task))
}
Q::InsertTaskUniq => {
let task = general_any_impl_insert_task_uniq(
INSERT_TASK_UNIQ_QUERY_SQLITE,
transaction,
params,
)
.await?;
Ok(Res::Task(task))
}
Q::UpdateTaskState => {
let task = general_any_impl_update_task_state(
UPDATE_TASK_STATE_QUERY_SQLITE,
Expand Down Expand Up @@ -400,16 +376,6 @@ impl BackendSqlXSQLite {
.await?;
Ok(Res::Task(task))
}
Q::FindTaskByUniqHash => {
let opt_task: Option<Task> = general_any_impl_find_task_by_uniq_hash(
FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE,
transaction,
params,
)
.await;

Ok(Res::OptTask(opt_task))
}
Q::FindTaskById => {
let task = general_any_impl_find_task_by_id(
FIND_TASK_BY_ID_QUERY_SQLITE,
Expand All @@ -424,6 +390,19 @@ impl BackendSqlXSQLite {
general_any_impl_retry_task(RETRY_TASK_QUERY_SQLITE, transaction, params)
.await?;

Ok(Res::Task(task))
}
Q::InsertTaskIfNotExists => {
let task = general_any_impl_insert_task_if_not_exists(
(
FIND_TASK_BY_UNIQ_HASH_QUERY_SQLITE,
INSERT_TASK_UNIQ_QUERY_SQLITE,
),
transaction,
params,
)
.await?;

Ok(Res::Task(task))
}
}
Expand All @@ -434,6 +413,17 @@ impl BackendSqlXSQLite {
}
}

async fn general_any_impl_insert_task_if_not_exists(
queries: (&str, &str),
transaction: &mut Transaction<'_, Any>,
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
match general_any_impl_find_task_by_uniq_hash(queries.0, transaction, &params).await {
Some(task) => Ok(task),
None => general_any_impl_insert_task_uniq(queries.1, transaction, params).await,
}
}

async fn general_any_impl_insert_task(
query: &str,
transaction: &mut Transaction<'_, Any>,
Expand Down Expand Up @@ -604,6 +594,8 @@ async fn general_any_impl_remove_task_by_metadata(

let uniq_hash = calculate_hash(&metadata.to_string());

println!("{query}");

Ok(sqlx::query(query)
.bind(uniq_hash)
.execute(transaction.acquire().await?)
Expand Down Expand Up @@ -646,7 +638,7 @@ async fn general_any_impl_fetch_task_type(
async fn general_any_impl_find_task_by_uniq_hash(
query: &str,
transaction: &mut Transaction<'_, Any>,
params: QueryParams<'_>,
params: &QueryParams<'_>,
) -> Option<Task> {
let metadata = params.metadata.unwrap();

Expand Down Expand Up @@ -729,13 +721,6 @@ impl BackendSqlXMySQL {

Ok(Res::Task(task))
}
Q::InsertTaskUniq => {
let task =
mysql_impl_insert_task_uniq(INSERT_TASK_UNIQ_QUERY_MYSQL, transaction, params)
.await?;
Ok(Res::Task(task))
}

Q::UpdateTaskState => {
let task = mysql_impl_update_task_state(
UPDATE_TASK_STATE_QUERY_MYSQL,
Expand Down Expand Up @@ -806,16 +791,6 @@ impl BackendSqlXMySQL {
.await?;
Ok(Res::Task(task))
}
Q::FindTaskByUniqHash => {
let opt_task: Option<Task> = general_any_impl_find_task_by_uniq_hash(
FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL,
transaction,
params,
)
.await;

Ok(Res::OptTask(opt_task))
}
Q::FindTaskById => {
let task: Task = general_any_impl_find_task_by_id(
FIND_TASK_BY_ID_QUERY_MYSQL,
Expand All @@ -830,6 +805,19 @@ impl BackendSqlXMySQL {
let task =
mysql_impl_retry_task(RETRY_TASK_QUERY_MYSQL, transaction, params).await?;

Ok(Res::Task(task))
}
Q::InsertTaskIfNotExists => {
let task = mysql_any_impl_insert_task_if_not_exists(
(
FIND_TASK_BY_UNIQ_HASH_QUERY_MYSQL,
INSERT_TASK_UNIQ_QUERY_MYSQL,
),
transaction,
params,
)
.await?;

Ok(Res::Task(task))
}
}
Expand Down Expand Up @@ -895,6 +883,10 @@ async fn mysql_impl_insert_task_uniq(

let uniq_hash = calculate_hash(&metadata_str);

println!("{} len : {}", uniq_hash, uniq_hash.len());

println!("reach here");

let affected_rows = sqlx::query(query)
.bind(uuid_as_str)
.bind(metadata_str)
Expand All @@ -904,6 +896,7 @@ async fn mysql_impl_insert_task_uniq(
.execute(transaction.acquire().await?)
.await?
.rows_affected();
println!("reach here 2");

if affected_rows != 1 {
// here we should return an error
Expand Down Expand Up @@ -1034,3 +1027,14 @@ async fn mysql_impl_retry_task(

Ok(failed_task)
}

async fn mysql_any_impl_insert_task_if_not_exists(
queries: (&str, &str),
transaction: &mut Transaction<'_, Any>,
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
match general_any_impl_find_task_by_uniq_hash(queries.0, transaction, &params).await {
Some(task) => Ok(task),
None => mysql_impl_insert_task_uniq(queries.1, transaction, params).await,
}
}
6 changes: 1 addition & 5 deletions fang/src/asynk/queries_mysql/insert_task_uniq.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
INSERT INTO fang_tasks ( id , metadata, task_type , uniq_hash, scheduled_at)
VALUES ($1, $2 , $3, $4, $5 ) ;


SELECT id , metadata , error_message, state , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE id = $1 ;
INSERT INTO fang_tasks(id,metadata,task_type,uniq_hash,scheduled_at) VALUES (?, ? , ?, ?, ?) ;
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_mysql/remove_task_by_metadata.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DELETE FROM fang_tasks WHERE uniq_hash = ?
DELETE FROM fang_tasks WHERE uniq_hash = ? ;
2 changes: 1 addition & 1 deletion fang/src/blocking/mysql_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ diesel::table! {
#[max_length = 255]
task_type -> Varchar,
#[max_length = 64]
uniq_hash -> Nullable<Char>,
uniq_hash -> Nullable<Varchar>,
retries -> Integer,
#[max_length = 32]
scheduled_at -> Varchar,
Expand Down

0 comments on commit 16e94ef

Please sign in to comment.