Skip to content

Commit

Permalink
fix: introduce testwrapper and add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Jul 18, 2024
1 parent 4a4eb20 commit d19f46c
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 222 deletions.
3 changes: 2 additions & 1 deletion examples/email-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ apalis = { path = "../../", default-features = false }
futures-util = "0.3.0"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
log = "0.4"
log = "0.4"
email_address = "0.2.5"
43 changes: 41 additions & 2 deletions examples/email-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::{str::FromStr, sync::Arc};

use apalis::prelude::*;
use email_address::EmailAddress;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize, Clone)]
Expand All @@ -7,8 +11,43 @@ pub struct Email {
pub text: String,
}

pub async fn send_email(job: Email) {
log::info!("Attempting to send email to {}", job.to);
pub async fn send_email(job: Email) -> Result<(), Error> {
let validation = EmailAddress::from_str(&job.to);
match validation {
Ok(email) => {
log::info!("Attempting to send email to {}", email.as_str());
Ok(())
}
Err(email_address::Error::InvalidCharacter) => {
log::error!("Killed send email job. Invalid character {}", job.to);
Err(Error::Abort(String::from("Invalid character. Job killed")))
}
Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))),
}
}

pub fn example_good_email() -> Email {
Email {
subject: "Test Subject".to_string(),
to: "example@gmail.com".to_string(),
text: "Some Text".to_string(),
}
}

pub fn example_killed_email() -> Email {
Email {
subject: "Test Subject".to_string(),
to: "example@©.com".to_string(), // killed because it has © which is invalid
text: "Some Text".to_string(),
}
}

pub fn example_retry_able_email() -> Email {
Email {
subject: "Test Subject".to_string(),
to: "example".to_string(),
text: "Some Text".to_string(),
}
}

pub const FORM_HTML: &str = r#"
Expand Down
6 changes: 3 additions & 3 deletions examples/redis-with-msg-pack/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use apalis::prelude::*;
Expand All @@ -13,11 +13,11 @@ struct MessagePack;
impl<T: Serialize + DeserializeOwned> Codec<T, Vec<u8>> for MessagePack {
type Error = Error;
fn encode(&self, input: &T) -> Result<Vec<u8>, Self::Error> {
rmp_serde::to_vec(input).map_err(|e| Error::SourceError(Box::new(e)))
rmp_serde::to_vec(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}

fn decode(&self, compact: &Vec<u8>) -> Result<T, Self::Error> {
rmp_serde::from_slice(compact).map_err(|e| Error::SourceError(Box::new(e)))
rmp_serde::from_slice(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn main() -> Result<()> {
produce_jobs(storage.clone()).await?;

let worker = WorkerBuilder::new("rango-tango")
.chain(|svc| svc.map_err(Error::Failed))
.chain(|svc| svc.map_err(|e| Error::Failed(Arc::new(e))))
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.data(Count::default())
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ optional = true


[features]
default = []
default = ["test-utils"]
docsrs = ["document-features"]
sleep = ["futures-timer"]
json = ["serde_json"]
test-utils = []

[package.metadata.docs.rs]
# defines the configuration attribute `docsrs`
Expand Down
15 changes: 9 additions & 6 deletions packages/apalis-core/src/codec/json.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::{error::Error, Codec};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
Expand All @@ -9,32 +11,33 @@ pub struct JsonCodec;
impl<T: Serialize + DeserializeOwned> Codec<T, Vec<u8>> for JsonCodec {
type Error = Error;
fn encode(&self, input: &T) -> Result<Vec<u8>, Self::Error> {
serde_json::to_vec(input).map_err(|e| Error::SourceError(Box::new(e)))
serde_json::to_vec(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}

fn decode(&self, compact: &Vec<u8>) -> Result<T, Self::Error> {
serde_json::from_slice(compact).map_err(|e| Error::SourceError(Box::new(e)))
serde_json::from_slice(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}
}

impl<T: Serialize + DeserializeOwned> Codec<T, String> for JsonCodec {
type Error = Error;
fn encode(&self, input: &T) -> Result<String, Self::Error> {
serde_json::to_string(input).map_err(|e| Error::SourceError(Box::new(e)))
serde_json::to_string(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}

fn decode(&self, compact: &String) -> Result<T, Self::Error> {
serde_json::from_str(compact).map_err(|e| Error::SourceError(Box::new(e)))
serde_json::from_str(compact).map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}
}

impl<T: Serialize + DeserializeOwned> Codec<T, Value> for JsonCodec {
type Error = Error;
fn encode(&self, input: &T) -> Result<Value, Self::Error> {
serde_json::to_value(input).map_err(|e| Error::SourceError(Box::new(e)))
serde_json::to_value(input).map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}

fn decode(&self, compact: &Value) -> Result<T, Self::Error> {
serde_json::from_value(compact.clone()).map_err(|e| Error::SourceError(Box::new(e)))
serde_json::from_value(compact.clone())
.map_err(|e| Error::SourceError(Arc::new(Box::new(e))))
}
}
20 changes: 8 additions & 12 deletions packages/apalis-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::error::Error as StdError;
use std::{error::Error as StdError, sync::Arc};
use thiserror::Error;

use crate::worker::WorkerError;
Expand All @@ -7,28 +7,24 @@ use crate::worker::WorkerError;
pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;

/// Represents a general error returned by a task or by internals of the platform
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum Error {
/// An error occurred during execution.
#[error("FailedError: {0}")]
Failed(#[source] BoxDynError),
Failed(#[source] Arc<BoxDynError>),

/// A generic IO error
#[error("IoError: {0}")]
Io(#[from] std::io::Error),
Io(#[from] Arc<std::io::Error>),

/// Missing some context and yet it was requested during execution.
#[error("MissingContextError: {0}")]
MissingContext(String),

/// Execution was aborted
#[error("AbortError")]
Abort,

/// Execution failed and job will be retried
#[error("RetryError: {0}")]
Retry(#[source] BoxDynError),
#[error("AbortError: {0}")]
Abort(String),

/// Encountered an error during worker execution
#[error("WorkerError: {0}")]
Expand All @@ -38,11 +34,11 @@ pub enum Error {
/// Encountered an error during service execution
/// This should not be used inside a task function
#[error("Encountered an error during service execution")]
ServiceError(#[source] BoxDynError),
ServiceError(#[source] Arc<BoxDynError>),

#[doc(hidden)]
/// Encountered an error during service execution
/// This should not be used inside a task function
#[error("Encountered an error during streaming")]
SourceError(#[source] BoxDynError),
SourceError(#[source] Arc<BoxDynError>),
}
Loading

0 comments on commit d19f46c

Please sign in to comment.