Skip to content

Commit

Permalink
use variant NoBackend instead Option
Browse files Browse the repository at this point in the history
  • Loading branch information
pxp9 committed Apr 15, 2024
1 parent 0dd461f commit edf744a
Showing 1 changed file with 56 additions and 68 deletions.
124 changes: 56 additions & 68 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub(crate) enum InternalPool {
MySql(MySqlPool),
#[cfg(feature = "asynk-sqlite")]
Sqlite(SqlitePool),
NoBackend,
}

impl InternalPool {
Expand Down Expand Up @@ -192,22 +193,23 @@ impl InternalPool {
}
}

pub(crate) fn backend(&self) -> BackendSqlX {
pub(crate) fn backend(&self) -> Result<BackendSqlX, AsyncQueueError> {
match *self {
#[cfg(feature = "asynk-postgres")]
InternalPool::Pg(_) => BackendSqlX::Pg,
InternalPool::Pg(_) => Ok(BackendSqlX::Pg),
#[cfg(feature = "asynk-mysql")]
InternalPool::MySql(_) => BackendSqlX::MySql,
InternalPool::MySql(_) => Ok(BackendSqlX::MySql),
#[cfg(feature = "asynk-sqlite")]
InternalPool::Sqlite(_) => BackendSqlX::Sqlite,
InternalPool::Sqlite(_) => Ok(BackendSqlX::Sqlite),
InternalPool::NoBackend => Err(AsyncQueueError::NotConnectedError),
}
}
}

#[derive(TypedBuilder, Debug, Clone)]
pub struct AsyncQueue {
#[builder(default=None, setter(skip))]
pool: Option<InternalPool>,
#[builder(default=InternalPool::NoBackend, setter(skip))]
pool: InternalPool,
#[builder(setter(into))]
uri: String,
#[builder(setter(into))]
Expand Down Expand Up @@ -272,8 +274,7 @@ async fn get_pool(

Ok(InternalPool::Sqlite(pool))
}
#[allow(unreachable_patterns)]
_ => panic!("Not a valid backend"),
_ => Err(AsyncQueueError::NotConnectedError),
}
}

Expand All @@ -295,7 +296,7 @@ impl AsyncQueue {

let pool = get_pool(kind, &self.uri, self.max_pool_size).await?;

self.pool = Some(pool);
self.pool = pool;
self.connected = true;
Ok(())
}
Expand Down Expand Up @@ -424,12 +425,13 @@ impl AsyncQueue {
impl AsyncQueueable for AsyncQueue {
async fn find_task_by_id(&mut self, id: &Uuid) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;

let backend = pool.backend()?;

let query_params = QueryParams::builder().uuid(id).build();

let task = pool
.backend()
let task = backend
.execute_query(SqlXQuery::FindTaskById, pool, query_params)
.await?
.unwrap_task();
Expand All @@ -443,32 +445,30 @@ impl AsyncQueueable for AsyncQueue {
) -> Result<Option<Task>, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;

let task = Self::fetch_and_touch_task_query(pool, &pool.backend(), task_type).await?;
let backend = pool.backend()?;

let task = Self::fetch_and_touch_task_query(pool, &backend, task_type).await?;

Ok(task)
}

async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let metadata = serde_json::to_value(task)?;

let task = if !task.uniq() {
Self::insert_task_query(
pool,
&pool.backend(),
&metadata,
&task.task_type(),
&Utc::now(),
)
.await?
Self::insert_task_query(pool, &backend, &metadata, &task.task_type(), &Utc::now())
.await?
} else {
Self::insert_task_if_not_exist_query(
pool,
&pool.backend(),
&backend,
&metadata,
&task.task_type(),
&Utc::now(),
Expand All @@ -482,22 +482,23 @@ impl AsyncQueueable for AsyncQueue {
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let task = Self::schedule_task_query(pool, &pool.backend(), task).await?;
let task = Self::schedule_task_query(pool, &backend, task).await?;

Ok(task)
}

async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let query_params = QueryParams::builder().build();

let result = pool
.backend()
let result = backend
.execute_query(SqlXQuery::RemoveAllTask, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -508,12 +509,13 @@ impl AsyncQueueable for AsyncQueue {
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
// this unwrap is safe because we check if connection is established
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;

let backend = pool.backend()?;

let query_params = QueryParams::builder().build();

let result = pool
.backend()
let result = backend
.execute_query(SqlXQuery::RemoveAllScheduledTask, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -523,12 +525,12 @@ impl AsyncQueueable for AsyncQueue {

async fn remove_task(&mut self, id: &Uuid) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let query_params = QueryParams::builder().uuid(id).build();

let result = pool
.backend()
let result = backend
.execute_query(SqlXQuery::RemoveTask, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -542,12 +544,12 @@ impl AsyncQueueable for AsyncQueue {
) -> Result<u64, AsyncQueueError> {
if task.uniq() {
self.check_if_connection()?;
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let query_params = QueryParams::builder().runnable(task).build();

let result = pool
.backend()
let result = backend
.execute_query(SqlXQuery::RemoveTaskByMetadata, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -560,12 +562,12 @@ impl AsyncQueueable for AsyncQueue {

async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let query_params = QueryParams::builder().task_type(task_type).build();

let result = pool
.backend()
let result = backend
.execute_query(SqlXQuery::RemoveTaskType, pool, query_params)
.await?
.unwrap_u64();
Expand All @@ -579,12 +581,12 @@ impl AsyncQueueable for AsyncQueue {
state: FangTaskState,
) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let query_params = QueryParams::builder().uuid(&task.id).state(state).build();

let task = pool
.backend()
let task = backend
.execute_query(SqlXQuery::UpdateTaskState, pool, query_params)
.await?
.unwrap_task();
Expand All @@ -598,15 +600,15 @@ impl AsyncQueueable for AsyncQueue {
error_message: &str,
) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let query_params = QueryParams::builder()
.error_message(error_message)
.task(task)
.build();

let failed_task = pool
.backend()
let failed_task = backend
.execute_query(SqlXQuery::FailTask, pool, query_params)
.await?
.unwrap_task();
Expand All @@ -622,16 +624,16 @@ impl AsyncQueueable for AsyncQueue {
) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;

let pool = self.pool.as_ref().unwrap();
let pool = &self.pool;
let backend = pool.backend()?;

let query_params = QueryParams::builder()
.backoff_seconds(backoff_seconds)
.error_message(error)
.task(task)
.build();

let failed_task = pool
.backend()
let failed_task = backend
.execute_query(SqlXQuery::RetryTask, pool, query_params)
.await?
.unwrap_task();
Expand Down Expand Up @@ -662,14 +664,7 @@ impl AsyncQueue {
let create_query: &str = &format!("CREATE DATABASE {} WITH TEMPLATE fang;", db_name);
let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name);

let mut conn = res
.pool
.as_mut()
.unwrap()
.unwrap_pg_pool()
.acquire()
.await
.unwrap();
let mut conn = res.pool.unwrap_pg_pool().acquire().await.unwrap();

log::info!("Deleting database {db_name} ...");
conn.execute(delete_query).await.unwrap();
Expand All @@ -688,7 +683,7 @@ impl AsyncQueue {
log::info!("Database {db_name} created !!");

res.connected = false;
res.pool = None;
res.pool = InternalPool::NoBackend;
res.uri = format!("{}/{}", base_url, db_name);
res.connect().await.unwrap();

Expand Down Expand Up @@ -757,14 +752,7 @@ impl AsyncQueue {

let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name);

let mut conn = res
.pool
.as_mut()
.unwrap()
.unwrap_mysql_pool()
.acquire()
.await
.unwrap();
let mut conn = res.pool.unwrap_mysql_pool().acquire().await.unwrap();

log::info!("Deleting database {db_name} ...");
conn.execute(delete_query).await.unwrap();
Expand All @@ -783,7 +771,7 @@ impl AsyncQueue {
log::info!("Database {db_name} created !!");

res.connected = false;
res.pool = None;
res.pool = InternalPool::NoBackend;
res.uri = format!("{}/{}", base_url, db_name);
res.connect().await.unwrap();

Expand Down

0 comments on commit edf744a

Please sign in to comment.