Skip to content

Commit

Permalink
Feature: Save results for storages (#369)
Browse files Browse the repository at this point in the history
* Feature: Save results for storages

Currently just the status is stored, this PR adds the ability to save the result

* fix: result from storage

* fix: kill and abort issue
  • Loading branch information
geofmureithi authored Jul 13, 2024
1 parent 97ff348 commit 4ad94d2
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 108 deletions.
2 changes: 1 addition & 1 deletion examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn main() -> Result<()> {
produce_jobs(storage.clone()).await?;

let worker = WorkerBuilder::new("rango-tango")
.chain(|svc| svc.map_err(|e| Error::Failed(e)))
.chain(|svc| svc.map_err(Error::Failed))
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.data(Count::default())
Expand Down
16 changes: 10 additions & 6 deletions packages/apalis-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,27 @@ pub type BoxDynError = Box<dyn StdError + 'static + Send + Sync>;
#[non_exhaustive]
pub enum Error {
/// An error occurred during execution.
#[error("Task Failed: {0}")]
#[error("FailedError: {0}")]
Failed(#[source] BoxDynError),

/// A generic IO error
#[error("IO error: {0}")]
#[error("IoError: {0}")]
Io(#[from] std::io::Error),

/// Missing some context and yet it was requested during execution.
#[error("MissingContext: {0}")]
InvalidContext(String),
#[error("MissingContextError: {0}")]
MissingContext(String),

/// Execution was aborted
#[error("Execution was aborted")]
#[error("AbortError")]
Abort,

/// Execution failed and job will be retried
#[error("RetryError: {0}")]
Retry(#[source] BoxDynError),

/// Encountered an error during worker execution
#[error("Encountered an error during worker execution")]
#[error("WorkerError: {0}")]
WorkerError(WorkerError),

#[doc(hidden)]
Expand Down
47 changes: 26 additions & 21 deletions packages/apalis-core/src/layers.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::{request::Request, worker::WorkerId};
use futures::channel::mpsc::{SendError, Sender};
use futures::SinkExt;
use futures::{future::BoxFuture, Future, FutureExt};
use std::marker::PhantomData;
use std::{fmt, sync::Arc};
pub use tower::{
layer::layer_fn, layer::util::Identity, util::BoxCloneService, Layer, Service, ServiceBuilder,
};

use crate::{request::Request, worker::WorkerId};
use futures::{future::BoxFuture, Future, FutureExt};

/// A generic layer that has been stripped off types.
/// This is returned by a [crate::Backend] and can be used to customize the middleware of the service consuming tasks
pub struct CommonLayer<In, T, U, E> {
Expand Down Expand Up @@ -154,7 +153,9 @@ pub mod extensions {
}

/// A trait for acknowledging successful processing
pub trait Ack<J> {
/// This trait is called even when a task fails.
/// This is a way of a [`Backend`] to save the result of a job or message
pub trait Ack<Task> {
/// The data to fetch from context to allow acknowledgement
type Acknowledger;
/// The error returned by the ack
Expand All @@ -167,21 +168,21 @@ pub trait Ack<J> {
}

/// ACK response
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct AckResponse<A> {
/// The worker id
pub worker: WorkerId,
/// The acknowledger
pub acknowledger: A,
/// The stringified result
pub result: String,
pub result: Result<String, String>,
}

impl<A: fmt::Display> AckResponse<A> {
/// Output a json for the response
pub fn to_json(&self) -> String {
format!(
r#"{{"worker": "{}", "acknowledger": "{}", "result": "{}"}}"#,
r#"{{"worker": "{}", "acknowledger": "{}", "result": "{:?}"}}"#,
self.worker, self.acknowledger, self.result
)
}
Expand Down Expand Up @@ -260,15 +261,15 @@ impl<Sv: Clone, A: Clone, J> Clone for AckService<Sv, A, J> {
}
}

impl<SV, A, J> Service<Request<J>> for AckService<SV, A, J>
impl<SV, A, T> Service<Request<T>> for AckService<SV, A, T>
where
SV: Service<Request<J>> + Send + Sync + 'static,
SV: Service<Request<T>> + Send + Sync + 'static,
SV::Error: std::error::Error + Send + Sync + 'static,
<SV as Service<Request<J>>>::Future: std::marker::Send + 'static,
A: Ack<J> + Send + 'static + Clone + Send + Sync,
J: 'static,
<SV as Service<Request<J>>>::Response: std::marker::Send + fmt::Debug + Sync,
<A as Ack<J>>::Acknowledger: Sync + Send + Clone,
<SV as Service<Request<T>>>::Future: std::marker::Send + 'static,
A: Ack<T> + Send + 'static + Clone + Send + Sync,
T: 'static,
<SV as Service<Request<T>>>::Response: std::marker::Send + fmt::Debug + Sync,
<A as Ack<T>>::Acknowledger: Sync + Send + Clone,
{
type Response = SV::Response;
type Error = SV::Error;
Expand All @@ -281,29 +282,33 @@ where
self.service.poll_ready(cx)
}

fn call(&mut self, request: Request<J>) -> Self::Future {
fn call(&mut self, request: Request<T>) -> Self::Future {
let mut ack = self.ack.clone();
let worker_id = self.worker_id.clone();
let data = request.get::<<A as Ack<J>>::Acknowledger>().cloned();
let data = request.get::<<A as Ack<T>>::Acknowledger>().cloned();
let fut = self.service.call(request);
let fut_with_ack = async move {
let res = fut.await;
let result = res
.as_ref()
.map(|ok| format!("{ok:?}"))
.map_err(|e| e.to_string());
if let Some(task_id) = data {
if let Err(_e) = ack
.ack(AckResponse {
worker: worker_id,
acknowledger: task_id,
result: format!("{res:?}"),
result,
})
.await
{
// tracing::warn!("Acknowledgement Failed: {}", e);
// try get monitor, and emit
// TODO: Implement tracing in apalis core
// tracing::error!("Acknowledgement Failed: {}", e);
}
} else {
// tracing::warn!(
// tracing::error!(
// "Acknowledgement could not be called due to missing ack data in context : {}",
// &std::any::type_name::<<A as Ack<J>>::Acknowledger>()
// &std::any::type_name::<<A as Ack<T>>::Acknowledger>()
// );
}
res
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
-- KEYS[1]: this consumer's inflight set
-- KEYS[2]: the done jobs set
-- KEYS[3]: the job data hash

-- ARGV[1]: the job ID
-- ARGV[2]: the current time
-- ARGV[3]: the result of the job

-- Returns: bool

-- Remove the job from this consumer's inflight set
local removed = redis.call("srem", KEYS[1], ARGV[1])

local ns = "::result"
if removed == 1 then
-- Push the job on to the done jobs set
redis.call("zadd", KEYS[2], ARGV[2], ARGV[1])
redis.call("hmset", KEYS[3].. ns, ARGV[1], ARGV[3] )
return true
end

Expand Down
16 changes: 7 additions & 9 deletions packages/apalis-redis/lua/kill_job.lua
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
-- KEYS[1]: this consumer's inflight set
-- KEYS[2]: the dead jobs set
-- KEYS[3]: the job data hash

-- ARGV[1]: the job ID
-- ARGV[2]: the current time
-- ARGV[3]: the serialized job data

-- ARGV[3]: the result of the job
-- Returns: nil

-- Remove the job from this consumer's inflight set
local removed = redis.call("srem", KEYS[1], ARGV[1])

if removed == 1 then
-- Push the job on to the dead jobs set
redis.call("zadd", KEYS[2], ARGV[2], ARGV[1])
-- Push the job on to the dead jobs set
redis.call("zadd", KEYS[2], ARGV[2], ARGV[1])

-- Reset the job data
redis.call("hset", KEYS[3], ARGV[1], ARGV[3])
-- Save the result of the job
local ns = "::result"
redis.call("hmset", KEYS[3] .. ns, ARGV[1], ARGV[3])

return 1
return 1
end

return 0
11 changes: 9 additions & 2 deletions packages/apalis-redis/lua/retry_job.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

-- ARGV[1]: the job ID
-- ARGV[2]: the time at which to retry
-- ARGV[3]: the serialized job data
-- ARGV[3]: the result of the job

-- Returns: nil

Expand All @@ -15,8 +15,15 @@ if removed == 1 then
-- Push the job on to the scheduled set
redis.call("zadd", KEYS[2], ARGV[2], ARGV[1])

local job = redis.call('HGET', KEYS[3], ARGV[1])

-- Reset the job data
redis.call("hset", KEYS[3], ARGV[1], ARGV[3])
redis.call("hset", KEYS[3], ARGV[1], job)

-- Save the result of the job
local ns = "::result"
redis.call("hmset", KEYS[3].. ns, ARGV[1], ARGV[4] )

end

return removed
95 changes: 62 additions & 33 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct RedisQueueInfo {

#[derive(Clone, Debug)]
struct RedisScript {
ack_job: Script,
done_job: Script,
enqueue_scheduled: Script,
get_jobs: Script,
kill_job: Script,
Expand Down Expand Up @@ -415,7 +415,7 @@ impl<T: Serialize + DeserializeOwned, Conn> RedisStorage<T, Conn> {
config,
codec: Arc::new(Box::new(codec)),
scripts: RedisScript {
ack_job: redis::Script::new(include_str!("../lua/ack_job.lua")),
done_job: redis::Script::new(include_str!("../lua/done_job.lua")),
push_job: redis::Script::new(include_str!("../lua/push_job.lua")),
retry_job: redis::Script::new(include_str!("../lua/retry_job.lua")),
enqueue_scheduled: redis::Script::new(include_str!(
Expand Down Expand Up @@ -535,19 +535,59 @@ impl<T: Sync + Send, Conn: ConnectionLike + Send + Sync + 'static> Ack<T>
type Acknowledger = TaskId;
type Error = RedisError;
async fn ack(&mut self, res: AckResponse<Self::Acknowledger>) -> Result<(), RedisError> {
let ack_job = self.scripts.ack_job.clone();
let inflight_set = format!("{}:{}", self.config.inflight_jobs_set(), res.worker);
let done_jobs_set = &self.config.done_jobs_set();

let now: i64 = res.acknowledger.inner().timestamp_ms().try_into().unwrap();
let now: i64 = Utc::now().timestamp();

ack_job
.key(inflight_set)
.key(done_jobs_set)
.arg(res.acknowledger.to_string())
.arg(now)
.invoke_async(&mut self.conn)
.await
match res.result {
Ok(success_res) => {
let done_job = self.scripts.done_job.clone();
let done_jobs_set = &self.config.done_jobs_set();
done_job
.key(inflight_set)
.key(done_jobs_set)
.key(self.config.job_data_hash())
.arg(res.acknowledger.to_string())
.arg(now)
.arg(success_res)
.invoke_async(&mut self.conn)
.await
}
Err(e) => match e {
e if e.contains("BackoffRetry") => {
//do nothing, should be handled by BackoffLayer
Ok(())
}

e if e.starts_with("RetryError") => {
let retry_job = self.scripts.retry_job.clone();
let retry_jobs_set = &self.config.scheduled_jobs_set();
retry_job
.key(inflight_set)
.key(retry_jobs_set)
.key(self.config.job_data_hash())
.arg(res.acknowledger.to_string())
.arg(now)
.arg(e)
.invoke_async(&mut self.conn)
.await
}

_ => {
let kill_job = self.scripts.kill_job.clone();
let kill_jobs_set = &self.config.dead_jobs_set();
kill_job
.key(inflight_set)
.key(kill_jobs_set)
.key(self.config.job_data_hash())
.arg(res.acknowledger.to_string())
.arg(now)
.arg(e)
.invoke_async(&mut self.conn)
.await
}
},
}
}
}

Expand Down Expand Up @@ -862,27 +902,16 @@ impl<T, Conn: ConnectionLike + Send + Sync + 'static> RedisStorage<T, Conn> {
let current_worker_id = format!("{}:{}", self.config.inflight_jobs_set(), worker_id);
let job_data_hash = self.config.job_data_hash();
let dead_jobs_set = self.config.dead_jobs_set();
let fetch_job = self.fetch_by_id(task_id);
let now: i64 = Utc::now().timestamp();
let res = fetch_job.await?;
match res {
Some(job) => {
let data = self
.codec
.encode(&job.try_into()?)
.map_err(|e| (ErrorKind::IoError, "Encode error", e.to_string()))?;
kill_job
.key(current_worker_id)
.key(dead_jobs_set)
.key(job_data_hash)
.arg(task_id.to_string())
.arg(now)
.arg(data)
.invoke_async(&mut self.conn)
.await
}
None => Err(RedisError::from((ErrorKind::ResponseError, "Id not found"))),
}
kill_job
.key(current_worker_id)
.key(dead_jobs_set)
.key(job_data_hash)
.arg(task_id.to_string())
.arg(now)
.arg("AbortError")
.invoke_async(&mut self.conn)
.await
}

/// Required to add scheduled jobs to the active set
Expand Down Expand Up @@ -1051,7 +1080,7 @@ mod tests {
storage
.ack(AckResponse {
acknowledger: job_id.clone(),
result: "Success".to_string(),
result: Ok("Success".to_string()),
worker: worker_id.clone(),
})
.await
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl FromStr for State {
"Retry" => Ok(State::Retry),
"Failed" => Ok(State::Failed),
"Killed" => Ok(State::Killed),
_ => Err(Error::InvalidContext("Invalid Job state".to_string())),
_ => Err(Error::MissingContext("Invalid Job state".to_string())),
}
}
}
Expand Down
Loading

0 comments on commit 4ad94d2

Please sign in to comment.