From d19f46c633b900f42afa82b5beadfa031676e41d Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Thu, 18 Jul 2024 17:47:30 +0300 Subject: [PATCH] fix: introduce testwrapper and add more tests --- examples/email-service/Cargo.toml | 3 +- examples/email-service/src/lib.rs | 43 +++++- examples/redis-with-msg-pack/src/main.rs | 6 +- examples/redis/src/main.rs | 2 +- packages/apalis-core/Cargo.toml | 3 +- packages/apalis-core/src/codec/json.rs | 15 +- packages/apalis-core/src/error.rs | 20 +-- packages/apalis-core/src/lib.rs | 146 +++++++++---------- packages/apalis-core/src/memory.rs | 5 +- packages/apalis-core/src/request.rs | 5 +- packages/apalis-core/src/response.rs | 18 ++- packages/apalis-core/src/worker/mod.rs | 2 +- packages/apalis-cron/src/lib.rs | 3 +- packages/apalis-redis/src/storage.rs | 7 +- packages/apalis-sql/Cargo.toml | 1 + packages/apalis-sql/src/context.rs | 6 +- packages/apalis-sql/src/lib.rs | 82 ++++++++++- packages/apalis-sql/src/mysql.rs | 36 ++--- packages/apalis-sql/src/postgres.rs | 50 +++---- packages/apalis-sql/src/sqlite.rs | 178 +++++++++++++++++------ src/layers/catch_panic/mod.rs | 5 +- 21 files changed, 414 insertions(+), 222 deletions(-) diff --git a/examples/email-service/Cargo.toml b/examples/email-service/Cargo.toml index 8ede34e..3aca96a 100644 --- a/examples/email-service/Cargo.toml +++ b/examples/email-service/Cargo.toml @@ -8,4 +8,5 @@ apalis = { path = "../../", default-features = false } futures-util = "0.3.0" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -log = "0.4" \ No newline at end of file +log = "0.4" +email_address = "0.2.5" diff --git a/examples/email-service/src/lib.rs b/examples/email-service/src/lib.rs index f5f833d..467de89 100644 --- a/examples/email-service/src/lib.rs +++ b/examples/email-service/src/lib.rs @@ -1,3 +1,7 @@ +use std::{str::FromStr, sync::Arc}; + +use apalis::prelude::*; +use email_address::EmailAddress; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Clone)] @@ -7,8 +11,43 @@ pub struct Email { pub text: String, } -pub async fn send_email(job: Email) { - log::info!("Attempting to send email to {}", job.to); +pub async fn send_email(job: Email) -> Result<(), Error> { + let validation = EmailAddress::from_str(&job.to); + match validation { + Ok(email) => { + log::info!("Attempting to send email to {}", email.as_str()); + Ok(()) + } + Err(email_address::Error::InvalidCharacter) => { + log::error!("Killed send email job. Invalid character {}", job.to); + Err(Error::Abort(String::from("Invalid character. Job killed"))) + } + Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))), + } +} + +pub fn example_good_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@gmail.com".to_string(), + text: "Some Text".to_string(), + } +} + +pub fn example_killed_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@©.com".to_string(), // killed because it has © which is invalid + text: "Some Text".to_string(), + } +} + +pub fn example_retry_able_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example".to_string(), + text: "Some Text".to_string(), + } } pub const FORM_HTML: &str = r#" diff --git a/examples/redis-with-msg-pack/src/main.rs b/examples/redis-with-msg-pack/src/main.rs index 9a37165..4afa89a 100644 --- a/examples/redis-with-msg-pack/src/main.rs +++ b/examples/redis-with-msg-pack/src/main.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use anyhow::Result; use apalis::prelude::*; @@ -13,11 +13,11 @@ struct MessagePack; impl Codec> for MessagePack { type Error = Error; fn encode(&self, input: &T) -> Result, Self::Error> { - rmp_serde::to_vec(input).map_err(|e| Error::SourceError(Box::new(e))) + rmp_serde::to_vec(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &Vec) -> Result { - rmp_serde::from_slice(compact).map_err(|e| Error::SourceError(Box::new(e))) + rmp_serde::from_slice(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } diff --git a/examples/redis/src/main.rs b/examples/redis/src/main.rs index 3a35278..b0c9dd9 100644 --- a/examples/redis/src/main.rs +++ b/examples/redis/src/main.rs @@ -47,7 +47,7 @@ async fn main() -> Result<()> { produce_jobs(storage.clone()).await?; let worker = WorkerBuilder::new("rango-tango") - .chain(|svc| svc.map_err(Error::Failed)) + .chain(|svc| svc.map_err(|e| Error::Failed(Arc::new(e)))) .layer(RateLimitLayer::new(5, Duration::from_secs(1))) .layer(TimeoutLayer::new(Duration::from_millis(500))) .data(Count::default()) diff --git a/packages/apalis-core/Cargo.toml b/packages/apalis-core/Cargo.toml index 2a809bf..b04ad9c 100644 --- a/packages/apalis-core/Cargo.toml +++ b/packages/apalis-core/Cargo.toml @@ -29,10 +29,11 @@ optional = true [features] -default = [] +default = ["test-utils"] docsrs = ["document-features"] sleep = ["futures-timer"] json = ["serde_json"] +test-utils = [] [package.metadata.docs.rs] # defines the configuration attribute `docsrs` diff --git a/packages/apalis-core/src/codec/json.rs b/packages/apalis-core/src/codec/json.rs index 7ed7c7f..ef85854 100644 --- a/packages/apalis-core/src/codec/json.rs +++ b/packages/apalis-core/src/codec/json.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{error::Error, Codec}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; @@ -9,32 +11,33 @@ pub struct JsonCodec; impl Codec> for JsonCodec { type Error = Error; fn encode(&self, input: &T) -> Result, Self::Error> { - serde_json::to_vec(input).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::to_vec(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &Vec) -> Result { - serde_json::from_slice(compact).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::from_slice(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } impl Codec for JsonCodec { type Error = Error; fn encode(&self, input: &T) -> Result { - serde_json::to_string(input).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::to_string(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &String) -> Result { - serde_json::from_str(compact).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::from_str(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } impl Codec for JsonCodec { type Error = Error; fn encode(&self, input: &T) -> Result { - serde_json::to_value(input).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::to_value(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } fn decode(&self, compact: &Value) -> Result { - serde_json::from_value(compact.clone()).map_err(|e| Error::SourceError(Box::new(e))) + serde_json::from_value(compact.clone()) + .map_err(|e| Error::SourceError(Arc::new(Box::new(e)))) } } diff --git a/packages/apalis-core/src/error.rs b/packages/apalis-core/src/error.rs index 0f52699..6c81286 100644 --- a/packages/apalis-core/src/error.rs +++ b/packages/apalis-core/src/error.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::{error::Error as StdError, sync::Arc}; use thiserror::Error; use crate::worker::WorkerError; @@ -7,28 +7,24 @@ use crate::worker::WorkerError; pub type BoxDynError = Box; /// Represents a general error returned by a task or by internals of the platform -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum Error { /// An error occurred during execution. #[error("FailedError: {0}")] - Failed(#[source] BoxDynError), + Failed(#[source] Arc), /// A generic IO error #[error("IoError: {0}")] - Io(#[from] std::io::Error), + Io(#[from] Arc), /// Missing some context and yet it was requested during execution. #[error("MissingContextError: {0}")] MissingContext(String), /// Execution was aborted - #[error("AbortError")] - Abort, - - /// Execution failed and job will be retried - #[error("RetryError: {0}")] - Retry(#[source] BoxDynError), + #[error("AbortError: {0}")] + Abort(String), /// Encountered an error during worker execution #[error("WorkerError: {0}")] @@ -38,11 +34,11 @@ pub enum Error { /// Encountered an error during service execution /// This should not be used inside a task function #[error("Encountered an error during service execution")] - ServiceError(#[source] BoxDynError), + ServiceError(#[source] Arc), #[doc(hidden)] /// Encountered an error during service execution /// This should not be used inside a task function #[error("Encountered an error during streaming")] - SourceError(#[source] BoxDynError), + SourceError(#[source] Arc), } diff --git a/packages/apalis-core/src/lib.rs b/packages/apalis-core/src/lib.rs index 39dcdd8..19b1265 100644 --- a/packages/apalis-core/src/lib.rs +++ b/packages/apalis-core/src/lib.rs @@ -169,26 +169,25 @@ impl crate::executor::Executor for TestExecutor { } } +#[cfg(feature = "test-utils")] /// Test utilities that allows you to test backends pub mod test_utils { - use crate::error::{BoxDynError}; + use crate::error::BoxDynError; + use crate::executor::Executor; use crate::request::Request; use crate::task::task_id::TaskId; use crate::worker::WorkerId; use crate::Backend; - use futures::channel::mpsc::{channel, Sender}; + use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::stream::{Stream, StreamExt}; use futures::{Future, FutureExt, SinkExt}; - use std::collections::HashMap; use std::fmt::Debug; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; - use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; - use std::thread; use tower::{Layer, Service}; /// Define a dummy service @@ -214,21 +213,21 @@ pub mod test_utils { #[derive(Debug)] pub struct TestWrapper { stop_tx: Sender<()>, - executions: Arc>>>, + res_rx: Receiver<(TaskId, Result)>, _p: PhantomData, backend: B, } - impl Clone for TestWrapper { - fn clone(&self) -> Self { - TestWrapper { - stop_tx: self.stop_tx.clone(), - executions: Arc::clone(&self.executions), - _p: PhantomData, - backend: self.backend.clone(), - } - } - } + // impl Clone for TestWrapper { + // fn clone(&self) -> Self { + // TestWrapper { + // stop_tx: self.stop_tx.clone(), + // res_rx: self.res_rx.clone(), + // _p: PhantomData, + // backend: self.backend.clone(), + // } + // } + // } impl TestWrapper where @@ -238,62 +237,60 @@ pub mod test_utils { B::Stream: Stream>, crate::error::Error>> + Unpin, { /// Build a new instance provided a custom service - pub fn new_with_service(backend: B, service: S) -> Self + pub fn new_with_service(backend: B, service: S, executor: E) -> Self where S: Service> + Send + 'static, B::Layer: Layer, <>>::Layer as Layer>::Service: Service> + Send + 'static, - <<>>::Layer as Layer>::Service as Service>>::Response: Debug, - <<>>::Layer as Layer>::Service as Service>>::Error: Send + Into + Sync + <<>>::Layer as Layer>::Service as Service>>::Response: Send + Debug, + <<>>::Layer as Layer>::Service as Service>>::Error: Send + Into + Sync, + <<>>::Layer as Layer>::Service as Service>>::Future: Send + 'static, { let worker_id = WorkerId::new("test-worker"); let b = backend.clone(); let mut poller = b.poll(worker_id); let (stop_tx, mut stop_rx) = channel::<()>(1); + let (mut res_tx, res_rx) = channel(10); + let mut service = poller.layer.layer(service); - let executions: Arc>>> = - Default::default(); - let executions_clone = executions.clone(); - thread::spawn(move || { - futures::executor::block_on(async move { - let heartbeat = poller.heartbeat.shared(); - loop { - futures::select! { - - item = poller.stream.next().fuse() => match item { - Some(Ok(Some(req))) => { - - let task_id = req.get::().cloned().expect("Request does not contain Task_ID"); - // handle request - match service.call(req).await { - Ok(res) => { - executions_clone.lock().unwrap().insert(task_id, Ok(format!("{res:?}"))); - }, - Err(err) => { - executions_clone.lock().unwrap().insert(task_id, Err(err.into().to_string())); - } + let poller = async move { + let heartbeat = poller.heartbeat.shared(); + loop { + futures::select! { + + item = poller.stream.next().fuse() => match item { + Some(Ok(Some(req))) => { + + let task_id = req.get::().cloned().expect("Request does not contain Task_ID"); + // handle request + match service.call(req).await { + Ok(res) => { + res_tx.send((task_id, Ok(format!("{res:?}")))).await.unwrap(); + }, + Err(err) => { + res_tx.send((task_id, Err(err.into().to_string()))).await.unwrap(); } } - Some(Ok(None)) | None => break, - Some(Err(_e)) => { - // handle error - break; - } - }, - _ = stop_rx.next().fuse() => break, - _ = heartbeat.clone().fuse() => { - - }, - } + } + Some(Ok(None)) | None => break, + Some(Err(_e)) => { + // handle error + break; + } + }, + _ = stop_rx.next().fuse() => break, + _ = heartbeat.clone().fuse() => { + + }, } - }); - }); - + } + }; + executor.spawn(poller); Self { stop_tx, - executions, + res_rx, _p: PhantomData, backend, } @@ -305,8 +302,8 @@ pub mod test_utils { } /// Gets the current state of results - pub fn get_results(&self) -> HashMap> { - self.executions.lock().unwrap().clone() + pub async fn execute_next(&mut self) -> (TaskId, Result) { + self.res_rx.next().await.unwrap() } } @@ -342,33 +339,34 @@ pub mod test_utils { let service = apalis_test_service_fn(|request: Request| async { Ok::<_, io::Error>(request) }); - let mut t = TestWrapper::new_with_service(backend, service); - let res = t.get_results(); - assert_eq!(res.len(), 0); // No job is done + let mut t = TestWrapper::new_with_service(backend, service, TokioExecutor); t.enqueue(1).await.unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; - let res = t.get_results(); - assert_eq!(res.len(), 1); // One job is done + let _res = t.execute_next().await; + // assert_eq!(res.len(), 1); // One job is done } }; } #[macro_export] /// Tests a generic storage - macro_rules! test_storage { - ($backend_instance:expr) => { + macro_rules! generic_storage_test { + ($setup:path ) => { #[tokio::test] - async fn it_works_as_a_storage_backend() { - let backend = $backend_instance; - let service = apalis_test_service_fn(|request: Request| async { - Ok::<_, io::Error>(request) + async fn integration_test_storage_push_and_consume() { + let backend = $setup().await; + let service = apalis_test_service_fn(|request: Request| async move { + Ok::<_, io::Error>(request.take()) }); - let mut t = TestWrapper::new_with_service(backend, service); - let res = t.get_results(); - assert_eq!(res.len(), 0); // No job is done + let mut t = TestWrapper::new_with_service(backend, service, TokioExecutor); + let res = t.len().await.unwrap(); + assert_eq!(res, 0); // No jobs t.push(1).await.unwrap(); - ::apalis_core::sleep(Duration::from_secs(1)).await; - let res = t.get_results(); - assert_eq!(res.len(), 1); // One job is done + let res = t.len().await.unwrap(); + assert_eq!(res, 1); // A job exists + let res = t.execute_next().await; + assert_eq!(res.1, Ok("1".to_owned())); + let res = t.len().await.unwrap(); + assert_eq!(res, 0); // No jobs } }; } diff --git a/packages/apalis-core/src/memory.rs b/packages/apalis-core/src/memory.rs index 3523d6b..250190c 100644 --- a/packages/apalis-core/src/memory.rs +++ b/packages/apalis-core/src/memory.rs @@ -114,7 +114,10 @@ impl Backend> for MemoryStorage { impl MessageQueue for MemoryStorage { type Error = (); async fn enqueue(&mut self, message: Message) -> Result<(), Self::Error> { - self.inner.sender.try_send(Request::new(message)).map_err(|_| ())?; + self.inner + .sender + .try_send(Request::new(message)) + .map_err(|_| ())?; Ok(()) } diff --git a/packages/apalis-core/src/request.rs b/packages/apalis-core/src/request.rs index 29f0788..ac38147 100644 --- a/packages/apalis-core/src/request.rs +++ b/packages/apalis-core/src/request.rs @@ -4,7 +4,10 @@ use tower::layer::util::Identity; use std::{fmt::Debug, pin::Pin}; -use crate::{data::Extensions, error::Error, poller::Poller, task::task_id::TaskId, worker::WorkerId, Backend}; +use crate::{ + data::Extensions, error::Error, poller::Poller, task::task_id::TaskId, worker::WorkerId, + Backend, +}; /// Represents a job which can be serialized and executed diff --git a/packages/apalis-core/src/response.rs b/packages/apalis-core/src/response.rs index 7c2a231..efda892 100644 --- a/packages/apalis-core/src/response.rs +++ b/packages/apalis-core/src/response.rs @@ -1,4 +1,4 @@ -use std::any::Any; +use std::{any::Any, sync::Arc}; use crate::error::Error; @@ -15,22 +15,30 @@ impl IntoResponse for bool { fn into_response(self) -> std::result::Result { match self { true => Ok(true), - false => Err(Error::Failed(Box::new(std::io::Error::new( + false => Err(Error::Failed(Arc::new(Box::new(std::io::Error::new( std::io::ErrorKind::Other, "Job returned false", - )))), + ))))), } } } -impl IntoResponse +impl IntoResponse for std::result::Result { type Result = Result; fn into_response(self) -> Result { match self { Ok(value) => Ok(value), - Err(e) => Err(Error::Failed(Box::new(e))), + Err(e) => { + // Try to downcast the error to see if it is already of type `Error` + if let Some(custom_error) = + (&e as &(dyn std::error::Error + 'static)).downcast_ref::() + { + return Err(custom_error.clone()); + } + Err(Error::Failed(Arc::new(Box::new(e)))) + } } } } diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index b9c071f..6d911ec 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -135,7 +135,7 @@ pub enum Event { } /// Possible errors that can occur when starting a worker. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum WorkerError { /// An error occurred while processing a job. #[error("Failed to process job: {0}")] diff --git a/packages/apalis-cron/src/lib.rs b/packages/apalis-cron/src/lib.rs index 024fa9f..babcf24 100644 --- a/packages/apalis-cron/src/lib.rs +++ b/packages/apalis-cron/src/lib.rs @@ -68,6 +68,7 @@ use apalis_core::{error::Error, request::Request}; use chrono::{DateTime, TimeZone, Utc}; pub use cron::Schedule; use std::marker::PhantomData; +use std::sync::Arc; /// Represents a stream from a cron schedule with a timezone #[derive(Clone, Debug)] @@ -117,7 +118,7 @@ where match next { Some(next) => { let to_sleep = next - timezone.from_utc_datetime(&Utc::now().naive_utc()); - let to_sleep = to_sleep.to_std().map_err(|e| Error::Failed(e.into()))?; + let to_sleep = to_sleep.to_std().map_err(|e| Error::SourceError(Arc::new(e.into())))?; apalis_core::sleep(to_sleep).await; let mut data = Extensions::new(); data.insert(TaskId::new()); diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index 7676f2a..cf8fe1b 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -559,6 +559,7 @@ impl Ack Ok(()) } + // TODO: Just automatically retry e if e.starts_with("RetryError") => { let retry_job = self.scripts.retry_job.clone(); let retry_jobs_set = &self.config.scheduled_jobs_set(); @@ -983,13 +984,13 @@ impl RedisStorage { #[cfg(test)] mod tests { - use apalis_core::test_storage; + use apalis_core::generic_storage_test; use email_service::Email; use apalis_core::test_utils::apalis_test_service_fn; use apalis_core::test_utils::TestWrapper; - test_storage!({ + generic_storage_test!({ let redis_url = std::env::var("REDIS_URL").expect("No REDIS_URL is specified"); let conn = connect(redis_url).await.unwrap(); let storage = RedisStorage::new(conn); @@ -1093,7 +1094,7 @@ mod tests { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), - attempts: Attempt::new_with_value(0) + attempts: Attempt::new_with_value(0), }) .await .expect("failed to acknowledge the job"); diff --git a/packages/apalis-sql/Cargo.toml b/packages/apalis-sql/Cargo.toml index 8187dd1..d548bdb 100644 --- a/packages/apalis-sql/Cargo.toml +++ b/packages/apalis-sql/Cargo.toml @@ -45,6 +45,7 @@ apalis = { path = "../../", default-features = false, features = [ "tokio-comp", ] } once_cell = "1.19.0" +apalis-sql = { path = ".", features = ["tokio-comp"] } [package.metadata.docs.rs] # defines the configuration attribute `docsrs` diff --git a/packages/apalis-sql/src/context.rs b/packages/apalis-sql/src/context.rs index 7d5f385..5aed23e 100644 --- a/packages/apalis-sql/src/context.rs +++ b/packages/apalis-sql/src/context.rs @@ -1,8 +1,8 @@ use apalis_core::error::Error; use apalis_core::task::{attempt::Attempt, task_id::TaskId}; use apalis_core::worker::WorkerId; -use serde::{Deserialize, Serialize}; use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; use std::{fmt, str::FromStr}; /// The context for a job is represented here @@ -137,8 +137,6 @@ pub enum State { Running, /// Job was done successfully Done, - /// Retry Job - Retry, /// Job has failed. Check `last_error` Failed, /// Job has been killed @@ -159,7 +157,6 @@ impl FromStr for State { "Pending" | "Latest" => Ok(State::Pending), "Running" => Ok(State::Running), "Done" => Ok(State::Done), - "Retry" => Ok(State::Retry), "Failed" => Ok(State::Failed), "Killed" => Ok(State::Killed), _ => Err(Error::MissingContext("Invalid Job state".to_string())), @@ -173,7 +170,6 @@ impl fmt::Display for State { State::Pending => write!(f, "Pending"), State::Running => write!(f, "Running"), State::Done => write!(f, "Done"), - State::Retry => write!(f, "Retry"), State::Failed => write!(f, "Failed"), State::Killed => write!(f, "Killed"), } diff --git a/packages/apalis-sql/src/lib.rs b/packages/apalis-sql/src/lib.rs index b5d2113..75da76a 100644 --- a/packages/apalis-sql/src/lib.rs +++ b/packages/apalis-sql/src/lib.rs @@ -134,9 +134,89 @@ pub(crate) fn calculate_status(res: &Result) -> State { match res { Ok(_) => State::Done, Err(e) => match &e { - _ if e.starts_with("RetryError") => State::Retry, _ if e.starts_with("AbortError") => State::Killed, _ => State::Failed, }, } } + +/// +#[macro_export] +macro_rules! sql_storage_tests { + ($setup:path, $storage_type:ty, $job_type:ty) => { + async fn setup_test_wrapper() -> TestWrapper<$storage_type, $job_type> { + TestWrapper::new_with_service( + $setup().await, + apalis_core::service_fn::service_fn(email_service::send_email), + TokioExecutor, + ) + } + + #[tokio::test] + async fn integration_test_kill_job() { + let mut storage = setup_test_wrapper().await; + + storage + .push(email_service::example_killed_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await; + assert_eq!( + res, + Err("AbortError: Invalid character. Job killed".to_owned()) + ); + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Killed); + assert!(ctx.done_at().is_some()); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"AbortError: Invalid character. Job killed\"}" + ); + } + + #[tokio::test] + async fn integration_test_acknowledge_good_job() { + let mut storage = setup_test_wrapper().await; + storage + .push(email_service::example_good_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await; + assert_eq!(res, Ok("()".to_owned())); + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Done); + assert!(ctx.done_at().is_some()); + } + + #[tokio::test] + async fn integration_test_acknowledge_failed_job() { + let mut storage = setup_test_wrapper().await; + + storage + .push(email_service::example_retry_able_email()) + .await + .unwrap(); + + for index in 1..25 { + let (job_id, res) = storage.execute_next().await; + assert_eq!( + res, + Err("FailedError: Missing separator character '@'.".to_owned()) + ); + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Failed); + assert_eq!(ctx.attempts().current(), index); + assert!(ctx.done_at().is_some()); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"FailedError: Missing separator character '@'.\"}" + ); + } + } + }; +} diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index 8bcb7b1..7d2bc84 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -385,7 +385,7 @@ impl Backend Backend MysqlStorage { mod tests { use crate::context::State; + use crate::sql_storage_tests; use super::*; + use apalis::utils::TokioExecutor; use apalis_core::task::attempt::Attempt; use apalis_core::test_utils::DummyService; use email_service::Email; use futures::StreamExt; - use apalis_core::test_storage; + use apalis_core::generic_storage_test; use apalis_core::test_utils::apalis_test_service_fn; use apalis_core::test_utils::TestWrapper; - test_storage!({ - let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); - let pool = MySqlPool::connect(db_url).await.unwrap(); - MysqlStorage::setup(&pool) - .await - .expect("failed to migrate DB"); - let storage = MysqlStorage::new(pool); + generic_storage_test!(setup); - storage - }); + sql_storage_tests!(setup::, MysqlStorage, Email); /// migrate DB and return a storage instance. - async fn setup() -> TestWrapper, Email> { + async fn setup() -> MysqlStorage { let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); // Because connections cannot be shared across async runtime // (different runtimes are created for each test), @@ -546,7 +541,7 @@ mod tests { .expect("failed to migrate DB"); let storage = MysqlStorage::new(pool); - TestWrapper::new_with_service(storage, DummyService) + storage } /// rollback DB changes made by tests. @@ -555,7 +550,7 @@ mod tests { /// - worker identified by `worker_id` /// /// You should execute this function in the end of a test - async fn cleanup(storage: TestWrapper, Email>, worker_id: &WorkerId) { + async fn cleanup(storage: MysqlStorage, worker_id: &WorkerId) { sqlx::query("DELETE FROM jobs WHERE lock_by = ? OR status = 'Pending'") .bind(worker_id.to_string()) .execute(&storage.pool) @@ -595,7 +590,7 @@ mod tests { } async fn register_worker_at( - storage: &mut TestWrapper, Email>, + storage: &mut MysqlStorage, last_seen: DateTime, ) -> WorkerId { let worker_id = WorkerId::new("test-worker"); @@ -607,20 +602,17 @@ mod tests { worker_id } - async fn register_worker(storage: &mut TestWrapper, Email>) -> WorkerId { + async fn register_worker(storage: &mut MysqlStorage) -> WorkerId { let now = Utc::now(); register_worker_at(storage, now).await } - async fn push_email(storage: &mut TestWrapper, Email>, email: Email) { + async fn push_email(storage: &mut MysqlStorage, email: Email) { storage.push(email).await.expect("failed to push a job"); } - async fn get_job( - storage: &mut TestWrapper, Email>, - job_id: &TaskId, - ) -> Request { + async fn get_job(storage: &mut MysqlStorage, job_id: &TaskId) -> Request { storage .fetch_by_id(job_id) .await diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index cfb9133..bfe6e26 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -149,11 +149,11 @@ impl Backend Backend { if let Some(ids) = ids { let ack_ids: Vec<(String, String, String, String, u64)> = ids.iter().map(|c| { - (c.acknowledger.to_string(), c.worker.to_string(), serde_json::to_string(&c.result).unwrap(), calculate_status(&c.result).to_string(), c.attempts.current() as u64) + (c.acknowledger.to_string(), c.worker.to_string(), serde_json::to_string(&c.result).unwrap(), calculate_status(&c.result).to_string(), (c.attempts.current() + 1) as u64 ) }).collect(); let query = "UPDATE apalis.jobs SET status = Q.status, done_at = now(), lock_by = Q.lock_by, last_error = Q.result, attempts = Q.attempts FROM ( @@ -207,13 +207,7 @@ impl Backend PostgresStorage { #[cfg(test)] mod tests { use crate::context::State; + use crate::sql_storage_tests; use super::*; + use apalis::utils::TokioExecutor; use apalis_core::task::attempt::Attempt; use apalis_core::test_utils::DummyService; use chrono::Utc; use email_service::Email; - use apalis_core::test_storage; + use apalis_core::generic_storage_test; use apalis_core::test_utils::apalis_test_service_fn; use apalis_core::test_utils::TestWrapper; - test_storage!({ - let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); - let pool = PgPool::connect(&db_url).await.unwrap(); - // Because connections cannot be shared across async runtime - // (different runtimes are created for each test), - // we don't share the storage and tests must be run sequentially. - PostgresStorage::setup(&pool).await.unwrap(); - let storage = PostgresStorage::new(pool); - storage - }); + generic_storage_test!(setup); + + sql_storage_tests!(setup::, PostgresStorage, Email); /// migrate DB and return a storage instance. - async fn setup() -> TestWrapper, Email> { + async fn setup() -> PostgresStorage { let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); let pool = PgPool::connect(&db_url).await.unwrap(); // Because connections cannot be shared across async runtime @@ -640,7 +629,7 @@ mod tests { // we don't share the storage and tests must be run sequentially. PostgresStorage::setup(&pool).await.unwrap(); let storage = PostgresStorage::new(pool); - TestWrapper::new_with_service(storage, DummyService) + storage } /// rollback DB changes made by tests. @@ -649,7 +638,7 @@ mod tests { /// - worker identified by `worker_id` /// /// You should execute this function in the end of a test - async fn cleanup(storage: TestWrapper, Email>, worker_id: &WorkerId) { + async fn cleanup(storage: PostgresStorage, worker_id: &WorkerId) { let mut tx = storage .pool .acquire() @@ -676,7 +665,7 @@ mod tests { } async fn consume_one( - storage: &mut TestWrapper, Email>, + storage: &mut PostgresStorage, worker_id: &WorkerId, ) -> Request { let req = storage.fetch_next(worker_id).await; @@ -684,7 +673,7 @@ mod tests { } async fn register_worker_at( - storage: &mut TestWrapper, Email>, + storage: &mut PostgresStorage, last_seen: Timestamp, ) -> WorkerId { let worker_id = WorkerId::new("test-worker"); @@ -696,18 +685,15 @@ mod tests { worker_id } - async fn register_worker(storage: &mut TestWrapper, Email>) -> WorkerId { + async fn register_worker(storage: &mut PostgresStorage) -> WorkerId { register_worker_at(storage, Utc::now().timestamp()).await } - async fn push_email(storage: &mut TestWrapper, Email>, email: Email) { + async fn push_email(storage: &mut PostgresStorage, email: Email) { storage.push(email).await.expect("failed to push a job"); } - async fn get_job( - storage: &mut TestWrapper, Email>, - job_id: &TaskId, - ) -> Request { + async fn get_job(storage: &mut PostgresStorage, job_id: &TaskId) -> Request { storage .fetch_by_id(job_id) .await diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 2a1445a..05f2c17 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -188,7 +188,6 @@ impl SqliteStorage { let config = self.config.clone(); try_stream! { loop { - apalis_core::sleep(interval).await; let tx = pool.clone(); let mut tx = tx.acquire().await?; let job_type = &config.namespace; @@ -217,6 +216,7 @@ impl SqliteStorage { } } }; + apalis_core::sleep(interval).await; } } } @@ -462,7 +462,7 @@ impl Backend Ack for SqliteStorage { .bind(res.worker.to_string()) .bind(result) .bind(calculate_status(&res.result).to_string()) - .bind(res.attempts.current() as i64) + .bind(res.attempts.current() as i64 + 1) .execute(&pool) .await?; Ok(()) @@ -503,29 +503,27 @@ impl Ack for SqliteStorage { mod tests { use crate::context::State; + use crate::sql_storage_tests; use super::*; + use apalis::utils::TokioExecutor; + use apalis_core::service_fn::service_fn; use apalis_core::task::attempt::Attempt; use apalis_core::test_utils::DummyService; use chrono::Utc; + use email_service::example_good_email; use email_service::Email; use futures::StreamExt; - use apalis_core::test_storage; + use apalis_core::generic_storage_test; use apalis_core::test_utils::apalis_test_service_fn; use apalis_core::test_utils::TestWrapper; - test_storage!({ - let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); - SqliteStorage::setup(&pool) - .await - .expect("failed to migrate DB"); - let storage = SqliteStorage::new(pool); - storage - }); + generic_storage_test!(setup); + sql_storage_tests!(setup::, SqliteStorage, Email); /// migrate DB and return a storage instance. - async fn setup() -> TestWrapper, Email> { + async fn setup() -> SqliteStorage { // Because connections cannot be shared across async runtime // (different runtimes are created for each test), // we don't share the storage and tests must be run sequentially. @@ -533,9 +531,9 @@ mod tests { SqliteStorage::setup(&pool) .await .expect("failed to migrate DB"); - let storage = SqliteStorage::::new(pool); + let storage = SqliteStorage::::new(pool); - TestWrapper::new_with_service(storage, DummyService) + storage } #[tokio::test] @@ -553,16 +551,8 @@ mod tests { assert_eq!(len, 1); } - fn example_email() -> Email { - Email { - subject: "Test Subject".to_string(), - to: "example@postgres".to_string(), - text: "Some Text".to_string(), - } - } - async fn consume_one( - storage: &mut TestWrapper, Email>, + storage: &mut SqliteStorage, worker_id: &WorkerId, ) -> Request { let mut stream = storage @@ -576,10 +566,7 @@ mod tests { .expect("no job is pending") } - async fn register_worker_at( - storage: &mut TestWrapper, Email>, - last_seen: i64, - ) -> WorkerId { + async fn register_worker_at(storage: &mut SqliteStorage, last_seen: i64) -> WorkerId { let worker_id = WorkerId::new("test-worker"); storage @@ -589,18 +576,15 @@ mod tests { worker_id } - async fn register_worker(storage: &mut TestWrapper, Email>) -> WorkerId { + async fn register_worker(storage: &mut SqliteStorage) -> WorkerId { register_worker_at(storage, Utc::now().timestamp()).await } - async fn push_email(storage: &mut TestWrapper, Email>, email: Email) { + async fn push_email(storage: &mut SqliteStorage, email: Email) { storage.push(email).await.expect("failed to push a job"); } - async fn get_job( - storage: &mut TestWrapper, Email>, - job_id: &TaskId, - ) -> Request { + async fn get_job(storage: &mut SqliteStorage, job_id: &TaskId) -> Request { storage .fetch_by_id(job_id) .await @@ -611,10 +595,12 @@ mod tests { #[tokio::test] async fn test_consume_last_pushed_job() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + push_email(&mut storage, example_good_email()).await; + let len = storage.len().await.expect("Could not fetch the jobs count"); + assert_eq!(len, 1); + let job = consume_one(&mut storage, &worker_id).await; let ctx = job.get::().unwrap(); assert_eq!(*ctx.status(), State::Running); @@ -625,20 +611,20 @@ mod tests { #[tokio::test] async fn test_acknowledge_job() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; - let worker_id = register_worker(&mut storage).await; + push_email(&mut storage, example_good_email()).await; let job = consume_one(&mut storage, &worker_id).await; - let ctx = job.get::().unwrap(); - let job_id = ctx.id(); + let ctx = job.get::(); + assert!(ctx.is_some()); + let job_id = ctx.unwrap().id(); storage .ack(AckResponse { acknowledger: job_id.clone(), result: Ok("Success".to_string()), worker: worker_id.clone(), - attempts: Attempt::new_with_value(0), + attempts: Attempt::new_with_value(1), }) .await .expect("failed to acknowledge the job"); @@ -653,7 +639,7 @@ mod tests { async fn test_kill_job() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; + push_email(&mut storage, example_good_email()).await; let worker_id = register_worker(&mut storage).await; @@ -676,7 +662,7 @@ mod tests { async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; + push_email(&mut storage, example_good_email()).await; let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); @@ -692,18 +678,18 @@ mod tests { let job_id = ctx.id(); let job = get_job(&mut storage, job_id).await; let ctx = job.get::().unwrap(); - assert_eq!(*ctx.status(), State::Pending); + assert_eq!(*ctx.status(), State::Running); assert!(ctx.done_at().is_none()); - assert!(ctx.lock_by().is_none()); - assert!(ctx.lock_at().is_none()); - assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_string())); + assert!(ctx.lock_by().is_some()); + assert!(ctx.lock_at().is_some()); + assert_eq!(*ctx.last_error(), Some("".to_string())); //TODO: Fix this } #[tokio::test] async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { let mut storage = setup().await; - push_email(&mut storage, example_email()).await; + push_email(&mut storage, example_good_email()).await; let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); let worker_id = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; @@ -722,3 +708,99 @@ mod tests { assert_eq!(*ctx.lock_by(), Some(worker_id)); } } + +#[cfg(test)] +mod backend_tests { + use std::ops::DerefMut; + + use apalis::utils::TokioExecutor; + use apalis_core::{service_fn::service_fn, storage::Storage, test_utils::TestWrapper}; + use email_service::{ + example_good_email, example_killed_email, example_retry_able_email, Email, + }; + use sqlx::SqlitePool; + + use crate::context::{SqlContext, State}; + + use super::SqliteStorage; + + /// migrate DB and return a storage instance. + async fn setup() -> SqliteStorage { + // Because connections cannot be shared across async runtime + // (different runtimes are created for each test), + // we don't share the storage and tests must be run sequentially. + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + SqliteStorage::setup(&pool) + .await + .expect("failed to migrate DB"); + let storage = SqliteStorage::::new(pool); + + storage + } + + async fn setup_test_wrapper() -> TestWrapper, Email> { + TestWrapper::new_with_service( + setup().await, + service_fn(email_service::send_email), + TokioExecutor, + ) + } + + #[tokio::test] + async fn test_kill_job() { + let mut storage = setup_test_wrapper().await; + + storage.push(example_killed_email()).await.unwrap(); + + let (job_id, res) = storage.execute_next().await; + assert_eq!( + res, + Err("AbortError: Invalid character. Job killed".to_owned()) + ); + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Killed); + assert!(ctx.done_at().is_some()); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"AbortError: Invalid character. Job killed\"}" + ); + } + + #[tokio::test] + async fn test_acknowledge_good_job() { + let mut storage = setup_test_wrapper().await; + storage.push(example_good_email()).await.unwrap(); + + let (job_id, res) = storage.execute_next().await; + assert_eq!(res, Ok("()".to_owned())); + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Done); + assert!(ctx.done_at().is_some()); + } + + #[tokio::test] + async fn test_acknowledge_failed_job() { + let mut storage = setup_test_wrapper().await; + + storage.push(example_retry_able_email()).await.unwrap(); + + for index in 1..25 { + let (job_id, res) = storage.execute_next().await; + assert_eq!( + res, + Err("FailedError: Missing separator character '@'.".to_owned()) + ); + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.get::().unwrap(); + assert_eq!(*ctx.status(), State::Failed); + assert_eq!(ctx.attempts().current(), index); + assert!(ctx.done_at().is_some()); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"FailedError: Missing separator character '@'.\"}" + ); + } + } +} diff --git a/src/layers/catch_panic/mod.rs b/src/layers/catch_panic/mod.rs index c9c1791..6fe9587 100644 --- a/src/layers/catch_panic/mod.rs +++ b/src/layers/catch_panic/mod.rs @@ -2,6 +2,7 @@ use std::fmt; use std::future::Future; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use apalis_core::error::Error; @@ -100,10 +101,10 @@ where } else { "Unknown panic".to_string() }; - Poll::Ready(Err(Error::Failed(Box::new(PanicError( + Poll::Ready(Err(Error::Failed(Arc::new(Box::new(PanicError( panic_info, Backtrace::new(), - ))))) + )))))) } } }