Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add recovery of abandoned jobs to backend heartbeats #453

Merged
merged 9 commits into from
Nov 22, 2024
Merged
70 changes: 62 additions & 8 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use apalis_core::task::task_id::TaskId;
use apalis_core::worker::WorkerId;
use apalis_core::{Backend, Codec};
use chrono::Utc;
use chrono::{DateTime, Utc};
use futures::channel::mpsc::{self, Sender};
use futures::{select, FutureExt, SinkExt, StreamExt, TryFutureExt};
use log::*;
Expand Down Expand Up @@ -114,6 +114,7 @@
max_retries: usize,
keep_alive: Duration,
enqueue_scheduled: Duration,
reenqueue_orphaned_after: Duration,
namespace: String,
}

Expand All @@ -125,6 +126,7 @@
max_retries: 5,
keep_alive: Duration::from_secs(30),
enqueue_scheduled: Duration::from_secs(30),
reenqueue_orphaned_after: Duration::from_secs(300),
namespace: String::from("apalis_redis"),
}
}
Expand Down Expand Up @@ -277,6 +279,25 @@
pub fn signal_list(&self) -> String {
SIGNAL_LIST.replace("{queue}", &self.namespace)
}

/// Gets the reenqueue_orphaned_after duration.
pub fn reenqueue_orphaned_after(&self) -> Duration {
self.reenqueue_orphaned_after
}

/// Gets a mutable reference to the reenqueue_orphaned_after.
pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
&mut self.reenqueue_orphaned_after
}

/// Occasionally some workers die, or abandon jobs because of panics.
/// This is the time a task takes before its back to the queue
///
/// Defaults to 5 minutes
pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
self.reenqueue_orphaned_after = after;
self
}
}

/// Represents a [Storage] that uses Redis for storage.
Expand Down Expand Up @@ -400,6 +421,9 @@
let config = self.config.clone();
let stream: RequestStream<Request<T, RedisContext>> = Box::pin(rx);
let heartbeat = async move {
let mut reenqueue_orphaned_stm =
apalis_core::interval::interval(config.poll_interval).fuse();

let mut keep_alive_stm = apalis_core::interval::interval(config.keep_alive).fuse();

let mut enqueue_scheduled_stm =
Expand Down Expand Up @@ -448,6 +472,13 @@
}
}
}
_ = reenqueue_orphaned_stm.next() => {
let dead_since = Utc::now()
- chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap();
if let Err(e) = self.reenqueue_orphaned((config.buffer_size * 10) as i32, dead_since).await {
error!("ReenqueueOrphanedError: {}", e);
}
}
};
}
};
Expand Down Expand Up @@ -621,7 +652,7 @@
type Error = RedisError;
type Context = RedisContext;

async fn push_request(

Check warning on line 655 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
req: Request<T, RedisContext>,
) -> Result<Parts<Self::Context>, RedisError> {
Expand All @@ -644,7 +675,7 @@
Ok(req.parts)
}

async fn schedule_request(

Check warning on line 678 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
req: Request<Self::Job, RedisContext>,
on: i64,
Expand Down Expand Up @@ -707,7 +738,7 @@
Ok(())
}

async fn reschedule(

Check warning on line 741 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
job: Request<T, RedisContext>,
wait: Duration,
Expand Down Expand Up @@ -766,7 +797,7 @@
C: Codec<Compact = Vec<u8>> + Send + 'static,
{
/// Attempt to retry a job
pub async fn retry(&mut self, worker_id: &WorkerId, task_id: &TaskId) -> Result<i32, RedisError>

Check warning on line 800 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
where
T: Send + DeserializeOwned + Serialize + Unpin + Sync + 'static,
{
Expand Down Expand Up @@ -875,17 +906,21 @@
.invoke_async(&mut self.conn)
.await
}
/// Re-enqueue some jobs that might be orphaned.
/// Re-enqueue some jobs that might be orphaned after a number of seconds
pub async fn reenqueue_orphaned(
&mut self,
count: usize,
dead_since: i64,
count: i32,
dead_since: DateTime<Utc>,
) -> Result<usize, RedisError> {
let reenqueue_orphaned = self.scripts.reenqueue_orphaned.clone();
let consumers_set = self.config.consumers_set();
let active_jobs_list = self.config.active_jobs_list();
let signal_list = self.config.signal_list();

let now = Utc::now();
let duration = now.signed_duration_since(dead_since);
let dead_since = duration.num_seconds();

let res: Result<usize, RedisError> = reenqueue_orphaned
.key(consumers_set)
.key(active_jobs_list)
Expand Down Expand Up @@ -1043,11 +1078,21 @@

let worker_id = register_worker_at(&mut storage).await;

let _job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
.reenqueue_orphaned(5, 300)
.reenqueue_orphaned(1, dead_since)
.await
.expect("failed to reenqueue_orphaned");
let job = get_job(&mut storage, &job.parts.task_id).await;
let ctx = &job.parts.context;
// assert_eq!(*ctx.status(), State::Pending);
// assert!(ctx.done_at().is_none());
assert!(ctx.lock_by.is_none());
// assert!(ctx.lock_at().is_none());
// assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned()));
// TODO: Redis should store context aside
// assert_eq!(job.parts.attempt.current(), 1);
}

#[tokio::test]
Expand All @@ -1058,10 +1103,19 @@

let worker_id = register_worker_at(&mut storage).await;

let _job = consume_one(&mut storage, &worker_id).await;
let job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
.reenqueue_orphaned(5, 300)
.reenqueue_orphaned(1, dead_since)
.await
.expect("failed to reenqueue_orphaned");
let job = get_job(&mut storage, &job.parts.task_id).await;
let _ctx = &job.parts.context;
// assert_eq!(*ctx.status(), State::Running);
// TODO: update redis context
// assert_eq!(ctx.lock_by, Some(worker_id));
// assert!(ctx.lock_at().is_some());
// assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 0);
}
}
21 changes: 21 additions & 0 deletions packages/apalis-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Config {
keep_alive: Duration,
buffer_size: usize,
poll_interval: Duration,
reenqueue_orphaned_after: Duration,
namespace: String,
}

Expand All @@ -54,6 +55,7 @@ impl Default for Config {
keep_alive: Duration::from_secs(30),
buffer_size: 10,
poll_interval: Duration::from_millis(100),
reenqueue_orphaned_after: Duration::from_secs(300), // 5 minutes
namespace: String::from("apalis::sql"),
}
}
Expand Down Expand Up @@ -131,6 +133,25 @@ impl Config {
pub fn namespace_mut(&mut self) -> &mut String {
&mut self.namespace
}

/// Gets the reenqueue_orphaned_after duration.
pub fn reenqueue_orphaned_after(&self) -> Duration {
self.reenqueue_orphaned_after
}

/// Gets a mutable reference to the reenqueue_orphaned_after.
pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
&mut self.reenqueue_orphaned_after
}

/// Occasionally some workers die, or abandon jobs because of panics.
/// This is the time a task takes before its back to the queue
///
/// Defaults to 5 minutes
pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
self.reenqueue_orphaned_after = after;
self
}
}

/// Calculates the status from a result
Expand Down
76 changes: 48 additions & 28 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ where
impl<Req, Res, C> Backend<Request<Req, SqlContext>, Res> for MysqlStorage<Req, C>
where
Req: Serialize + DeserializeOwned + Sync + Send + Unpin + 'static,
C: Debug + Codec<Compact = Value> + Clone + Send + 'static,
C: Debug + Codec<Compact = Value> + Clone + Send + 'static + Sync,
{
type Stream = BackendStream<RequestStream<Request<Req, SqlContext>>>;

Expand All @@ -387,6 +387,7 @@ where
let pool = self.pool.clone();
let ack_notify = self.ack_notify.clone();
let mut hb_storage = self.clone();
let requeue_storage = self.clone();
let stream = self
.stream_jobs(&worker, config.poll_interval, config.buffer_size)
.map_err(|e| Error::SourceError(Arc::new(Box::new(e))));
Expand All @@ -400,15 +401,14 @@ where
.await
{
for (ctx, res) in ids {
let query = "UPDATE jobs SET status = ?, done_at = now(), last_error = ?, attempts = ? WHERE id = ? AND lock_by = ?";
let query = "UPDATE jobs SET status = ?, done_at = now(), last_error = ? WHERE id = ? AND lock_by = ?";
let query = sqlx::query(query);
let query = query
.bind(calculate_status(&res.inner).to_string())
.bind(
serde_json::to_string(&res.inner.as_ref().map_err(|e| e.to_string()))
.unwrap(),
)
.bind(res.attempt.current() as u64 + 1)
.bind(res.task_id.to_string())
.bind(ctx.lock_by().as_ref().unwrap().to_string());
if let Err(e) = query.execute(&pool).await {
Expand All @@ -429,10 +429,23 @@ where
apalis_core::sleep(config.keep_alive).await;
}
};
let reenqueue_beat = async move {
loop {
let dead_since = Utc::now()
- chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap();
if let Err(e) = requeue_storage
.reenqueue_orphaned(config.buffer_size.try_into().unwrap(), dead_since)
.await
{
error!("ReenqueueOrphaned failed: {e}");
}
apalis_core::sleep(config.poll_interval).await;
}
};
Poller::new_with_layer(
stream,
async {
futures::join!(heartbeat, ack_heartbeat);
futures::join!(heartbeat, ack_heartbeat, reenqueue_beat);
},
layer,
)
Expand Down Expand Up @@ -493,27 +506,22 @@ impl<T, C: Codec> MysqlStorage<T, C> {
}

/// Readd jobs that are abandoned to the queue
pub async fn reenqueue_orphaned(&self, timeout: i64) -> Result<bool, sqlx::Error> {
pub async fn reenqueue_orphaned(
&self,
count: i32,
dead_since: DateTime<Utc>,
) -> Result<bool, sqlx::Error> {
let job_type = self.config.namespace.clone();
let mut tx = self.pool.acquire().await?;
let query = r#"Update jobs
INNER JOIN ( SELECT workers.id as worker_id, jobs.id as job_id from workers INNER JOIN jobs ON jobs.lock_by = workers.id WHERE jobs.status = "Running" AND workers.last_seen < ? AND workers.worker_type = ?
ORDER BY lock_at ASC LIMIT ?) as workers ON jobs.lock_by = workers.worker_id AND jobs.id = workers.job_id
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned";"#;
let now = Utc::now().timestamp();
let seconds_ago = DateTime::from_timestamp(now - timeout, 0).ok_or(sqlx::Error::Io(
io::Error::new(io::ErrorKind::InvalidData, "Invalid timeout"),
))?;

sqlx::query(query)
.bind(seconds_ago)
.bind(dead_since)
.bind(job_type)
.bind::<i64>(
self.config
.buffer_size
.try_into()
.map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?,
)
.bind(count)
.execute(&mut *tx)
.await?;
Ok(true)
Expand Down Expand Up @@ -686,6 +694,8 @@ mod tests {
// register a worker not responding since 6 minutes ago
let worker_id = WorkerId::new("test-worker");

let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60);

let six_minutes_ago = Utc::now() - Duration::from_secs(60 * 6);

storage
Expand All @@ -699,20 +709,24 @@ mod tests {

assert_eq!(*ctx.status(), State::Running);

storage.reenqueue_orphaned(300).await.unwrap();
storage
.reenqueue_orphaned(1, five_minutes_ago)
.await
.unwrap();

// then, the job status has changed to Pending
let job = storage
.fetch_by_id(&job.parts.task_id)
.await
.unwrap()
.unwrap();
let context = job.parts.context;
assert_eq!(*context.status(), State::Pending);
assert!(context.lock_by().is_none());
assert!(context.lock_at().is_none());
assert!(context.done_at().is_none());
assert_eq!(*context.last_error(), Some("Job was abandoned".to_string()));
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Pending);
assert!(ctx.done_at().is_none());
assert!(ctx.lock_by().is_none());
assert!(ctx.lock_at().is_none());
assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned()));
assert_eq!(job.parts.attempt.current(), 1);
}

#[tokio::test]
Expand All @@ -727,6 +741,7 @@ mod tests {

// register a worker responding at 4 minutes ago
let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60);
let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);

let worker_id = WorkerId::new("test-worker");
storage
Expand All @@ -741,17 +756,22 @@ mod tests {
assert_eq!(*ctx.status(), State::Running);

// heartbeat with ReenqueueOrpharned pulse
storage.reenqueue_orphaned(300).await.unwrap();
storage
.reenqueue_orphaned(1, six_minutes_ago)
.await
.unwrap();

// then, the job status is not changed
let job = storage
.fetch_by_id(&job.parts.task_id)
.await
.unwrap()
.unwrap();
let context = job.parts.context;
// TODO: Fix assertions
assert_eq!(*context.status(), State::Running);
assert_eq!(*context.lock_by(), Some(worker_id.clone()));
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Running);
assert_eq!(*ctx.lock_by(), Some(worker_id));
assert!(ctx.lock_at().is_some());
assert_eq!(*ctx.last_error(), None);
assert_eq!(job.parts.attempt.current(), 1);
}
}
Loading
Loading