diff --git a/packages/apalis-core/src/lib.rs b/packages/apalis-core/src/lib.rs index 19b1265..500bdb5 100644 --- a/packages/apalis-core/src/lib.rs +++ b/packages/apalis-core/src/lib.rs @@ -173,14 +173,12 @@ impl crate::executor::Executor for TestExecutor { /// Test utilities that allows you to test backends pub mod test_utils { use crate::error::BoxDynError; - - use crate::executor::Executor; use crate::request::Request; - use crate::task::task_id::TaskId; use crate::worker::WorkerId; use crate::Backend; use futures::channel::mpsc::{channel, Receiver, Sender}; + use futures::future::BoxFuture; use futures::stream::{Stream, StreamExt}; use futures::{Future, FutureExt, SinkExt}; use std::fmt::Debug; @@ -217,18 +215,38 @@ pub mod test_utils { _p: PhantomData, backend: B, } - - // impl Clone for TestWrapper { - // fn clone(&self) -> Self { - // TestWrapper { - // stop_tx: self.stop_tx.clone(), - // res_rx: self.res_rx.clone(), - // _p: PhantomData, - // backend: self.backend.clone(), - // } - // } - // } - + /// A test wrapper to allow you to test without requiring a worker. + /// Important for testing backends and jobs + /// # Example + /// ```no_run + /// #[cfg(tests)] + /// mod tests { + /// use crate::{ + /// error::Error, memory::MemoryStorage, mq::MessageQueue, service_fn::service_fn, + /// }; + /// + /// use super::*; + /// + /// async fn is_even(req: usize) -> Result<(), Error> { + /// if req % 2 == 0 { + /// Ok(()) + /// } else { + /// Err(Error::Abort("Not an even number".to_string())) + /// } + /// } + /// + /// #[tokio::test] + /// async fn test_accepts_even() { + /// let backend = MemoryStorage::new(); + /// let (mut tester, poller) = TestWrapper::new_with_service(backend, service_fn(is_even)); + /// tokio::spawn(poller); + /// tester.enqueue(42usize).await.unwrap(); + /// assert_eq!(tester.size().await.unwrap(), 1); + /// let (_, resp) = tester.execute_next().await; + /// assert_eq!(resp, Ok("()".to_string())); + /// } + ///} + /// ```` impl TestWrapper where B: Backend> + Send + Sync + 'static + Clone, @@ -237,7 +255,7 @@ pub mod test_utils { B::Stream: Stream>, crate::error::Error>> + Unpin, { /// Build a new instance provided a custom service - pub fn new_with_service(backend: B, service: S, executor: E) -> Self + pub fn new_with_service(backend: B, service: S) -> (Self, BoxFuture<'static, ()>) where S: Service> + Send + 'static, B::Layer: Layer, @@ -263,7 +281,8 @@ pub mod test_utils { item = poller.stream.next().fuse() => match item { Some(Ok(Some(req))) => { - let task_id = req.get::().cloned().expect("Request does not contain Task_ID"); + let task_id = req.get::().cloned().unwrap_or_default(); + // .expect("Request does not contain Task_ID"); // handle request match service.call(req).await { Ok(res) => { @@ -287,13 +306,15 @@ pub mod test_utils { } } }; - executor.spawn(poller); - Self { - stop_tx, - res_rx, - _p: PhantomData, - backend, - } + ( + Self { + stop_tx, + res_rx, + _p: PhantomData, + backend, + }, + poller.boxed(), + ) } /// Stop polling @@ -339,7 +360,8 @@ pub mod test_utils { let service = apalis_test_service_fn(|request: Request| async { Ok::<_, io::Error>(request) }); - let mut t = TestWrapper::new_with_service(backend, service, TokioExecutor); + let (mut t, poller) = TestWrapper::new_with_service(backend, service); + tokio::spawn(poller); t.enqueue(1).await.unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; let _res = t.execute_next().await; @@ -357,7 +379,8 @@ pub mod test_utils { let service = apalis_test_service_fn(|request: Request| async move { Ok::<_, io::Error>(request.take()) }); - let mut t = TestWrapper::new_with_service(backend, service, TokioExecutor); + let (mut t, poller) = TestWrapper::new_with_service(backend, service); + tokio::spawn(poller); let res = t.len().await.unwrap(); assert_eq!(res, 0); // No jobs t.push(1).await.unwrap(); diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index cf8fe1b..958c53a 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -990,17 +990,12 @@ mod tests { use apalis_core::test_utils::apalis_test_service_fn; use apalis_core::test_utils::TestWrapper; - generic_storage_test!({ - let redis_url = std::env::var("REDIS_URL").expect("No REDIS_URL is specified"); - let conn = connect(redis_url).await.unwrap(); - let storage = RedisStorage::new(conn); - storage - }); + generic_storage_test!(setup); use super::*; /// migrate DB and return a storage instance. - async fn setup() -> RedisStorage { + async fn setup() -> RedisStorage { let redis_url = std::env::var("REDIS_URL").expect("No REDIS_URL is specified"); // Because connections cannot be shared across async runtime // (different runtimes are created for each test), diff --git a/packages/apalis-sql/Cargo.toml b/packages/apalis-sql/Cargo.toml index d548bdb..c586da9 100644 --- a/packages/apalis-sql/Cargo.toml +++ b/packages/apalis-sql/Cargo.toml @@ -46,6 +46,7 @@ apalis = { path = "../../", default-features = false, features = [ ] } once_cell = "1.19.0" apalis-sql = { path = ".", features = ["tokio-comp"] } +apalis-core = { path = "../apalis-core", features = ["test-utils"] } [package.metadata.docs.rs] # defines the configuration attribute `docsrs` diff --git a/packages/apalis-sql/src/lib.rs b/packages/apalis-sql/src/lib.rs index 75da76a..4588894 100644 --- a/packages/apalis-sql/src/lib.rs +++ b/packages/apalis-sql/src/lib.rs @@ -145,11 +145,12 @@ pub(crate) fn calculate_status(res: &Result) -> State { macro_rules! sql_storage_tests { ($setup:path, $storage_type:ty, $job_type:ty) => { async fn setup_test_wrapper() -> TestWrapper<$storage_type, $job_type> { - TestWrapper::new_with_service( + let (t, poller) = TestWrapper::new_with_service( $setup().await, apalis_core::service_fn::service_fn(email_service::send_email), - TokioExecutor, - ) + ); + tokio::spawn(poller); + t } #[tokio::test] diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index 7d2bc84..83dcea5 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -514,7 +514,6 @@ mod tests { use crate::sql_storage_tests; use super::*; - use apalis::utils::TokioExecutor; use apalis_core::task::attempt::Attempt; use apalis_core::test_utils::DummyService; @@ -530,7 +529,7 @@ mod tests { sql_storage_tests!(setup::, MysqlStorage, Email); /// migrate DB and return a storage instance. - async fn setup() -> MysqlStorage { + async fn setup() -> MysqlStorage { let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); // Because connections cannot be shared across async runtime // (different runtimes are created for each test), diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index bfe6e26..ed5c102 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -606,7 +606,6 @@ mod tests { use crate::sql_storage_tests; use super::*; - use apalis::utils::TokioExecutor; use apalis_core::task::attempt::Attempt; use apalis_core::test_utils::DummyService; use chrono::Utc; diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 05f2c17..5991e61 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -506,8 +506,6 @@ mod tests { use crate::sql_storage_tests; use super::*; - use apalis::utils::TokioExecutor; - use apalis_core::service_fn::service_fn; use apalis_core::task::attempt::Attempt; use apalis_core::test_utils::DummyService; use chrono::Utc; @@ -708,99 +706,3 @@ mod tests { assert_eq!(*ctx.lock_by(), Some(worker_id)); } } - -#[cfg(test)] -mod backend_tests { - use std::ops::DerefMut; - - use apalis::utils::TokioExecutor; - use apalis_core::{service_fn::service_fn, storage::Storage, test_utils::TestWrapper}; - use email_service::{ - example_good_email, example_killed_email, example_retry_able_email, Email, - }; - use sqlx::SqlitePool; - - use crate::context::{SqlContext, State}; - - use super::SqliteStorage; - - /// migrate DB and return a storage instance. - async fn setup() -> SqliteStorage { - // Because connections cannot be shared across async runtime - // (different runtimes are created for each test), - // we don't share the storage and tests must be run sequentially. - let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); - SqliteStorage::setup(&pool) - .await - .expect("failed to migrate DB"); - let storage = SqliteStorage::::new(pool); - - storage - } - - async fn setup_test_wrapper() -> TestWrapper, Email> { - TestWrapper::new_with_service( - setup().await, - service_fn(email_service::send_email), - TokioExecutor, - ) - } - - #[tokio::test] - async fn test_kill_job() { - let mut storage = setup_test_wrapper().await; - - storage.push(example_killed_email()).await.unwrap(); - - let (job_id, res) = storage.execute_next().await; - assert_eq!( - res, - Err("AbortError: Invalid character. Job killed".to_owned()) - ); - let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); - let ctx = job.get::().unwrap(); - assert_eq!(*ctx.status(), State::Killed); - assert!(ctx.done_at().is_some()); - assert_eq!( - ctx.last_error().clone().unwrap(), - "{\"Err\":\"AbortError: Invalid character. Job killed\"}" - ); - } - - #[tokio::test] - async fn test_acknowledge_good_job() { - let mut storage = setup_test_wrapper().await; - storage.push(example_good_email()).await.unwrap(); - - let (job_id, res) = storage.execute_next().await; - assert_eq!(res, Ok("()".to_owned())); - let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); - let ctx = job.get::().unwrap(); - assert_eq!(*ctx.status(), State::Done); - assert!(ctx.done_at().is_some()); - } - - #[tokio::test] - async fn test_acknowledge_failed_job() { - let mut storage = setup_test_wrapper().await; - - storage.push(example_retry_able_email()).await.unwrap(); - - for index in 1..25 { - let (job_id, res) = storage.execute_next().await; - assert_eq!( - res, - Err("FailedError: Missing separator character '@'.".to_owned()) - ); - let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); - let ctx = job.get::().unwrap(); - assert_eq!(*ctx.status(), State::Failed); - assert_eq!(ctx.attempts().current(), index); - assert!(ctx.done_at().is_some()); - assert_eq!( - ctx.last_error().clone().unwrap(), - "{\"Err\":\"FailedError: Missing separator character '@'.\"}" - ); - } - } -}