Skip to content

Commit

Permalink
independent decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
pxp9 committed Apr 14, 2024
1 parent 0e59175 commit 69a2c57
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 37 deletions.
91 changes: 62 additions & 29 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,30 @@ use chrono::DateTime;
use chrono::Utc;
use cron::Schedule;
//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7
use sqlx::any::AnyKind;
use sqlx::pool::PoolOptions;
use sqlx::Any;
use sqlx::AnyPool;
use sqlx::Database;

use sqlx::MySql;
use sqlx::MySqlPool;
use sqlx::Pool;
use sqlx::Postgres;
use sqlx::Sqlite;
use sqlx::SqlitePool;
use std::any::Any;
use std::str::FromStr;
use thiserror::Error;
use typed_builder::TypedBuilder;
use uuid::Uuid;

#[cfg(feature = "asynk-postgres")]
use sqlx::PgPool;

#[cfg(feature = "asynk-mysql")]
use sqlx::MySqlPool;

#[cfg(feature = "asynk-sqlite")]
use sqlx::SqlitePool;

#[cfg(test)]
use self::async_queue_tests::test_asynk_queue;

Expand Down Expand Up @@ -136,9 +150,12 @@ pub trait AsyncQueueable: Send {
///
#[derive(TypedBuilder, Debug, Clone)]
pub struct AsyncQueue {
pub struct AsyncQueue<DB>
where
DB: Database,
{
#[builder(default=None, setter(skip))]
pool: Option<AnyPool>,
pool: Option<Pool<DB>>,
#[builder(setter(into))]
uri: String,
#[builder(setter(into))]
Expand Down Expand Up @@ -172,20 +189,29 @@ use std::env;

use super::backend_sqlx::BackendSqlX;

fn get_backend(_anykind: AnyKind) -> BackendSqlX {
match _anykind {
#[cfg(feature = "asynk-postgres")]
AnyKind::Postgres => BackendSqlX::Pg,
#[cfg(feature = "asynk-mysql")]
AnyKind::MySql => BackendSqlX::MySql,
#[cfg(feature = "asynk-sqlite")]
AnyKind::Sqlite => BackendSqlX::Sqlite,
#[allow(unreachable_patterns)]
_ => unreachable!(),
fn get_backend<'a, DB: Database>(pool: &'a Pool<DB>) -> BackendSqlX {
let type_pool = pool.type_id();
#[cfg(feature = "asynk-postgres")]
if std::any::TypeId::of::<PgPool>() == type_pool {
return BackendSqlX::Pg;
}
#[cfg(feature = "asynk-mysql")]
if std::any::TypeId::of::<MySqlPool>() == type_pool {
return BackendSqlX::MySql;
}

#[cfg(feature = "asynk-sqlite")]
if std::any::TypeId::of::<SqlitePool>() == type_pool {
return BackendSqlX::Sqlite;
}

unreachable!()
}

impl AsyncQueue {
impl<DB> AsyncQueue<DB>
where
DB: Database,
{
/// Check if the connection with db is established
pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> {
if self.connected {
Expand All @@ -199,22 +225,20 @@ impl AsyncQueue {
pub async fn connect(&mut self) -> Result<(), AsyncQueueError> {
//install_default_drivers();

let pool: AnyPool = PoolOptions::new()
let pool: Pool<DB> = PoolOptions::new()
.max_connections(self.max_pool_size)
.connect(&self.uri)
.await?;

let anykind = pool.any_kind();

self.backend = get_backend(anykind);
self.backend = get_backend(&pool);

self.pool = Some(pool);
self.connected = true;
Ok(())
}

async fn fetch_and_touch_task_query(
pool: &Pool<Any>,
pool: &Pool<DB>,
backend: &BackendSqlX,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> {
Expand Down Expand Up @@ -250,7 +274,7 @@ impl AsyncQueue {
}

async fn insert_task_query(
pool: &Pool<Any>,
pool: &Pool<DB>,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
Expand All @@ -271,7 +295,7 @@ impl AsyncQueue {
}

async fn insert_task_if_not_exist_query(
pool: &Pool<Any>,
pool: &Pool<DB>,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
Expand All @@ -292,7 +316,7 @@ impl AsyncQueue {
}

async fn schedule_task_query(
pool: &Pool<Any>,
pool: &Pool<DB>,
backend: &BackendSqlX,
task: &dyn AsyncRunnable,
) -> Result<Task, AsyncQueueError> {
Expand Down Expand Up @@ -334,7 +358,10 @@ impl AsyncQueue {
}

#[async_trait]
impl AsyncQueueable for AsyncQueue {
impl<DB> AsyncQueueable for AsyncQueue<DB>
where
DB: Database,
{
async fn find_task_by_id(&mut self, id: &Uuid) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let pool = self.pool.as_ref().unwrap();
Expand Down Expand Up @@ -554,7 +581,7 @@ impl AsyncQueueable for AsyncQueue {
}

#[cfg(test)]
impl AsyncQueue {
impl AsyncQueue<Postgres> {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_postgres() -> Self {
dotenvy::dotenv().expect(".env file not found");
Expand Down Expand Up @@ -600,7 +627,10 @@ impl AsyncQueue {

res
}
}

#[cfg(test)]
impl AsyncQueue<Sqlite> {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_sqlite() -> Self {
dotenvy::dotenv().expect(".env file not found");
Expand Down Expand Up @@ -632,7 +662,10 @@ impl AsyncQueue {
res.connect().await.expect("fail to connect");
res
}
}

#[cfg(test)]
impl AsyncQueue<MySql> {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_mysql() -> Self {
dotenvy::dotenv().expect(".env file not found");
Expand Down Expand Up @@ -685,10 +718,10 @@ impl AsyncQueue {
}

#[cfg(test)]
test_asynk_queue! {postgres, crate::AsyncQueue, crate::AsyncQueue::test_postgres()}
test_asynk_queue! {postgres, crate::AsyncQueue<Postgres>, crate::AsyncQueue::test_postgres()}

#[cfg(test)]
test_asynk_queue! {sqlite, crate::AsyncQueue, crate::AsyncQueue::test_sqlite()}
test_asynk_queue! {sqlite, crate::AsyncQueue<Sqlite>, crate::AsyncQueue::test_sqlite()}

#[cfg(test)]
test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()}
test_asynk_queue! {mysql, crate::AsyncQueue<MySql>, crate::AsyncQueue::test_mysql()}
6 changes: 5 additions & 1 deletion fang/src/asynk/async_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ mod async_worker_tests {
use chrono::Duration;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use sqlx::Database;

#[derive(Serialize, Deserialize)]
struct WorkerAsyncTask {
Expand Down Expand Up @@ -563,7 +564,10 @@ mod async_worker_tests {
assert_eq!(id2, task2.id);
}

async fn insert_task(test: &mut AsyncQueue, task: &dyn AsyncRunnable) -> Task {
async fn insert_task<DB: Database>(
test: &mut AsyncQueue<DB>,
task: &dyn AsyncRunnable,
) -> Task {
test.insert_task(task).await.unwrap()
}

Expand Down
9 changes: 5 additions & 4 deletions fang/src/asynk/backend_sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use chrono::{DateTime, Duration, Utc};
use sha2::Digest;
use sha2::Sha256;
use sqlx::Any;
use sqlx::Database;
use sqlx::Pool;
use std::fmt::Debug;
use typed_builder::TypedBuilder;
Expand Down Expand Up @@ -80,10 +81,10 @@ impl Res {
}

impl BackendSqlX {
pub(crate) async fn execute_query<'a>(
pub(crate) async fn execute_query<'a, DB: Database>(
&self,
_query: SqlXQuery,
_pool: &Pool<Any>,
_pool: &Pool<DB>,
_params: QueryParams<'_>,
) -> Result<Res, AsyncQueueError> {
match self {
Expand Down Expand Up @@ -212,9 +213,9 @@ async fn general_any_impl_insert_task_uniq(
}

#[allow(dead_code)]
async fn general_any_impl_update_task_state(
async fn general_any_impl_update_task_state<DB: Database>(
query: &str,
pool: &Pool<Any>,
pool: &Pool<DB>,
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
let updated_at_str = format!("{}", Utc::now().format("%F %T%.f+00"));
Expand Down
7 changes: 4 additions & 3 deletions fang/src/asynk/backend_sqlx/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const RETRY_TASK_QUERY_POSTGRES: &str = include_str!("../queries_postgres/retry_
#[derive(Debug, Clone)]
pub(super) struct BackendSqlXPg {}

use sqlx::Database;
use SqlXQuery as Q;

use crate::AsyncQueueError;
Expand All @@ -41,12 +42,12 @@ use super::general_any_impl_remove_task_type;
use super::general_any_impl_retry_task;
use super::general_any_impl_update_task_state;
use super::{QueryParams, Res, SqlXQuery};
use sqlx::{Any, Pool};
use sqlx::Pool;

impl BackendSqlXPg {
pub(super) async fn execute_query(
pub(super) async fn execute_query<DB: Database>(
query: SqlXQuery,
pool: &Pool<Any>,
pool: &Pool<DB>,
params: QueryParams<'_>,
) -> Result<Res, AsyncQueueError> {
match query {
Expand Down

0 comments on commit 69a2c57

Please sign in to comment.