Skip to content

Commit

Permalink
fix: add sample for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Jul 18, 2024
1 parent d19f46c commit 457c742
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 137 deletions.
75 changes: 49 additions & 26 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,12 @@ impl crate::executor::Executor for TestExecutor {
/// Test utilities that allows you to test backends
pub mod test_utils {
use crate::error::BoxDynError;

use crate::executor::Executor;
use crate::request::Request;

use crate::task::task_id::TaskId;
use crate::worker::WorkerId;
use crate::Backend;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::future::BoxFuture;
use futures::stream::{Stream, StreamExt};
use futures::{Future, FutureExt, SinkExt};
use std::fmt::Debug;
Expand Down Expand Up @@ -217,18 +215,38 @@ pub mod test_utils {
_p: PhantomData<Req>,
backend: B,
}

// impl<B: Clone, Req> Clone for TestWrapper<B, Req> {
// fn clone(&self) -> Self {
// TestWrapper {
// stop_tx: self.stop_tx.clone(),
// res_rx: self.res_rx.clone(),
// _p: PhantomData,
// backend: self.backend.clone(),
// }
// }
// }

/// A test wrapper to allow you to test without requiring a worker.
/// Important for testing backends and jobs
/// # Example
/// ```no_run
/// #[cfg(tests)]
/// mod tests {
/// use crate::{
/// error::Error, memory::MemoryStorage, mq::MessageQueue, service_fn::service_fn,
/// };
///
/// use super::*;
///
/// async fn is_even(req: usize) -> Result<(), Error> {
/// if req % 2 == 0 {
/// Ok(())
/// } else {
/// Err(Error::Abort("Not an even number".to_string()))
/// }
/// }
///
/// #[tokio::test]
/// async fn test_accepts_even() {
/// let backend = MemoryStorage::new();
/// let (mut tester, poller) = TestWrapper::new_with_service(backend, service_fn(is_even));
/// tokio::spawn(poller);
/// tester.enqueue(42usize).await.unwrap();
/// assert_eq!(tester.size().await.unwrap(), 1);
/// let (_, resp) = tester.execute_next().await;
/// assert_eq!(resp, Ok("()".to_string()));
/// }
///}
/// ````
impl<B, Req> TestWrapper<B, Req>
where
B: Backend<Request<Req>> + Send + Sync + 'static + Clone,
Expand All @@ -237,7 +255,7 @@ pub mod test_utils {
B::Stream: Stream<Item = Result<Option<Request<Req>>, crate::error::Error>> + Unpin,
{
/// Build a new instance provided a custom service
pub fn new_with_service<S, E: Executor>(backend: B, service: S, executor: E) -> Self
pub fn new_with_service<S>(backend: B, service: S) -> (Self, BoxFuture<'static, ()>)
where
S: Service<Request<Req>> + Send + 'static,
B::Layer: Layer<S>,
Expand All @@ -263,7 +281,8 @@ pub mod test_utils {
item = poller.stream.next().fuse() => match item {
Some(Ok(Some(req))) => {

let task_id = req.get::<TaskId>().cloned().expect("Request does not contain Task_ID");
let task_id = req.get::<TaskId>().cloned().unwrap_or_default();
// .expect("Request does not contain Task_ID");
// handle request
match service.call(req).await {
Ok(res) => {
Expand All @@ -287,13 +306,15 @@ pub mod test_utils {
}
}
};
executor.spawn(poller);
Self {
stop_tx,
res_rx,
_p: PhantomData,
backend,
}
(
Self {
stop_tx,
res_rx,
_p: PhantomData,
backend,
},
poller.boxed(),
)
}

/// Stop polling
Expand Down Expand Up @@ -339,7 +360,8 @@ pub mod test_utils {
let service = apalis_test_service_fn(|request: Request<u32>| async {
Ok::<_, io::Error>(request)
});
let mut t = TestWrapper::new_with_service(backend, service, TokioExecutor);
let (mut t, poller) = TestWrapper::new_with_service(backend, service);
tokio::spawn(poller);
t.enqueue(1).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let _res = t.execute_next().await;
Expand All @@ -357,7 +379,8 @@ pub mod test_utils {
let service = apalis_test_service_fn(|request: Request<u32>| async move {
Ok::<_, io::Error>(request.take())
});
let mut t = TestWrapper::new_with_service(backend, service, TokioExecutor);
let (mut t, poller) = TestWrapper::new_with_service(backend, service);
tokio::spawn(poller);
let res = t.len().await.unwrap();
assert_eq!(res, 0); // No jobs
t.push(1).await.unwrap();
Expand Down
9 changes: 2 additions & 7 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,17 +990,12 @@ mod tests {
use apalis_core::test_utils::apalis_test_service_fn;
use apalis_core::test_utils::TestWrapper;

generic_storage_test!({
let redis_url = std::env::var("REDIS_URL").expect("No REDIS_URL is specified");
let conn = connect(redis_url).await.unwrap();
let storage = RedisStorage::new(conn);
storage
});
generic_storage_test!(setup);

use super::*;

/// migrate DB and return a storage instance.
async fn setup() -> RedisStorage<Email> {
async fn setup<T: Serialize + DeserializeOwned>() -> RedisStorage<T> {
let redis_url = std::env::var("REDIS_URL").expect("No REDIS_URL is specified");
// Because connections cannot be shared across async runtime
// (different runtimes are created for each test),
Expand Down
1 change: 1 addition & 0 deletions packages/apalis-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ apalis = { path = "../../", default-features = false, features = [
] }
once_cell = "1.19.0"
apalis-sql = { path = ".", features = ["tokio-comp"] }
apalis-core = { path = "../apalis-core", features = ["test-utils"] }

[package.metadata.docs.rs]
# defines the configuration attribute `docsrs`
Expand Down
7 changes: 4 additions & 3 deletions packages/apalis-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,12 @@ pub(crate) fn calculate_status(res: &Result<String, String>) -> State {
macro_rules! sql_storage_tests {
($setup:path, $storage_type:ty, $job_type:ty) => {
async fn setup_test_wrapper() -> TestWrapper<$storage_type, $job_type> {
TestWrapper::new_with_service(
let (t, poller) = TestWrapper::new_with_service(
$setup().await,
apalis_core::service_fn::service_fn(email_service::send_email),
TokioExecutor,
)
);
tokio::spawn(poller);
t
}

#[tokio::test]
Expand Down
3 changes: 1 addition & 2 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ mod tests {
use crate::sql_storage_tests;

use super::*;
use apalis::utils::TokioExecutor;
use apalis_core::task::attempt::Attempt;

use apalis_core::test_utils::DummyService;
Expand All @@ -530,7 +529,7 @@ mod tests {
sql_storage_tests!(setup::<Email>, MysqlStorage<Email>, Email);

/// migrate DB and return a storage instance.
async fn setup<T>() -> MysqlStorage<T> {
async fn setup<T: Serialize + DeserializeOwned>() -> MysqlStorage<T> {
let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
// Because connections cannot be shared across async runtime
// (different runtimes are created for each test),
Expand Down
1 change: 0 additions & 1 deletion packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,6 @@ mod tests {
use crate::sql_storage_tests;

use super::*;
use apalis::utils::TokioExecutor;
use apalis_core::task::attempt::Attempt;
use apalis_core::test_utils::DummyService;
use chrono::Utc;
Expand Down
98 changes: 0 additions & 98 deletions packages/apalis-sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,6 @@ mod tests {
use crate::sql_storage_tests;

use super::*;
use apalis::utils::TokioExecutor;
use apalis_core::service_fn::service_fn;
use apalis_core::task::attempt::Attempt;
use apalis_core::test_utils::DummyService;
use chrono::Utc;
Expand Down Expand Up @@ -708,99 +706,3 @@ mod tests {
assert_eq!(*ctx.lock_by(), Some(worker_id));
}
}

#[cfg(test)]
mod backend_tests {
use std::ops::DerefMut;

use apalis::utils::TokioExecutor;
use apalis_core::{service_fn::service_fn, storage::Storage, test_utils::TestWrapper};
use email_service::{
example_good_email, example_killed_email, example_retry_able_email, Email,
};
use sqlx::SqlitePool;

use crate::context::{SqlContext, State};

use super::SqliteStorage;

/// migrate DB and return a storage instance.
async fn setup() -> SqliteStorage<Email> {
// Because connections cannot be shared across async runtime
// (different runtimes are created for each test),
// we don't share the storage and tests must be run sequentially.
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SqliteStorage::setup(&pool)
.await
.expect("failed to migrate DB");
let storage = SqliteStorage::<Email>::new(pool);

storage
}

async fn setup_test_wrapper() -> TestWrapper<SqliteStorage<Email>, Email> {
TestWrapper::new_with_service(
setup().await,
service_fn(email_service::send_email),
TokioExecutor,
)
}

#[tokio::test]
async fn test_kill_job() {
let mut storage = setup_test_wrapper().await;

storage.push(example_killed_email()).await.unwrap();

let (job_id, res) = storage.execute_next().await;
assert_eq!(
res,
Err("AbortError: Invalid character. Job killed".to_owned())
);
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
let ctx = job.get::<SqlContext>().unwrap();
assert_eq!(*ctx.status(), State::Killed);
assert!(ctx.done_at().is_some());
assert_eq!(
ctx.last_error().clone().unwrap(),
"{\"Err\":\"AbortError: Invalid character. Job killed\"}"
);
}

#[tokio::test]
async fn test_acknowledge_good_job() {
let mut storage = setup_test_wrapper().await;
storage.push(example_good_email()).await.unwrap();

let (job_id, res) = storage.execute_next().await;
assert_eq!(res, Ok("()".to_owned()));
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
let ctx = job.get::<SqlContext>().unwrap();
assert_eq!(*ctx.status(), State::Done);
assert!(ctx.done_at().is_some());
}

#[tokio::test]
async fn test_acknowledge_failed_job() {
let mut storage = setup_test_wrapper().await;

storage.push(example_retry_able_email()).await.unwrap();

for index in 1..25 {
let (job_id, res) = storage.execute_next().await;
assert_eq!(
res,
Err("FailedError: Missing separator character '@'.".to_owned())
);
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
let ctx = job.get::<SqlContext>().unwrap();
assert_eq!(*ctx.status(), State::Failed);
assert_eq!(ctx.attempts().current(), index);
assert!(ctx.done_at().is_some());
assert_eq!(
ctx.last_error().clone().unwrap(),
"{\"Err\":\"FailedError: Missing separator character '@'.\"}"
);
}
}
}

0 comments on commit 457c742

Please sign in to comment.