Skip to content

Commit

Permalink
feat: add recovery of abandoned jobs to backend heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Nov 22, 2024
1 parent e7dad8c commit e4d982d
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 33 deletions.
49 changes: 43 additions & 6 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use apalis_core::task::namespace::Namespace;
use apalis_core::task::task_id::TaskId;
use apalis_core::worker::WorkerId;
use apalis_core::{Backend, Codec};
use chrono::Utc;
use chrono::{DateTime, Utc};
use futures::channel::mpsc::{self, Sender};
use futures::{select, FutureExt, SinkExt, StreamExt, TryFutureExt};
use log::*;
Expand Down Expand Up @@ -114,6 +114,7 @@ pub struct Config {
max_retries: usize,
keep_alive: Duration,
enqueue_scheduled: Duration,
reenqueue_orphaned_after: Duration,
namespace: String,
}

Expand All @@ -125,6 +126,7 @@ impl Default for Config {
max_retries: 5,
keep_alive: Duration::from_secs(30),
enqueue_scheduled: Duration::from_secs(30),
reenqueue_orphaned_after: Duration::from_secs(300),
namespace: String::from("apalis_redis"),
}
}
Expand Down Expand Up @@ -277,6 +279,25 @@ impl Config {
pub fn signal_list(&self) -> String {
SIGNAL_LIST.replace("{queue}", &self.namespace)
}

/// Gets the reenqueue_orphaned_after duration.
pub fn reenqueue_orphaned_after(&self) -> Duration {
self.reenqueue_orphaned_after
}

/// Gets a mutable reference to the reenqueue_orphaned_after.
pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
&mut self.reenqueue_orphaned_after
}

/// Occasionally some workers die, or abandon jobs because of panics.
/// This is the time a task takes before its back to the queue
///
/// Defaults to 5 minutes
pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
self.reenqueue_orphaned_after = after;
self
}
}

/// Represents a [Storage] that uses Redis for storage.
Expand Down Expand Up @@ -400,6 +421,9 @@ where
let config = self.config.clone();
let stream: RequestStream<Request<T, RedisContext>> = Box::pin(rx);
let heartbeat = async move {
let mut reenqueue_orphaned_stm =
apalis_core::interval::interval(config.poll_interval).fuse();

let mut keep_alive_stm = apalis_core::interval::interval(config.keep_alive).fuse();

let mut enqueue_scheduled_stm =
Expand Down Expand Up @@ -448,6 +472,13 @@ where
}
}
}
_ = reenqueue_orphaned_stm.next() => {
let dead_since = Utc::now()
- chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap();
if let Err(e) = self.reenqueue_orphaned((config.buffer_size * 10) as i32, dead_since).await {
error!("ReenqueueOrphanedError: {}", e);
}
}
};
}
};
Expand Down Expand Up @@ -875,17 +906,21 @@ where
.invoke_async(&mut self.conn)
.await
}
/// Re-enqueue some jobs that might be orphaned.
/// Re-enqueue some jobs that might be orphaned after a number of seconds
pub async fn reenqueue_orphaned(
&mut self,
count: usize,
dead_since: i64,
count: i32,
dead_since: DateTime<Utc>,
) -> Result<usize, RedisError> {
let reenqueue_orphaned = self.scripts.reenqueue_orphaned.clone();
let consumers_set = self.config.consumers_set();
let active_jobs_list = self.config.active_jobs_list();
let signal_list = self.config.signal_list();

let now = Utc::now();
let duration = now.signed_duration_since(dead_since);
let dead_since = duration.num_seconds();

let res: Result<usize, RedisError> = reenqueue_orphaned
.key(consumers_set)
.key(active_jobs_list)
Expand Down Expand Up @@ -1044,8 +1079,9 @@ mod tests {
let worker_id = register_worker_at(&mut storage).await;

let _job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
.reenqueue_orphaned(5, 300)
.reenqueue_orphaned(5, dead_since)
.await
.expect("failed to reenqueue_orphaned");
}
Expand All @@ -1059,8 +1095,9 @@ mod tests {
let worker_id = register_worker_at(&mut storage).await;

let _job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
.reenqueue_orphaned(5, 300)
.reenqueue_orphaned(5, dead_since)
.await
.expect("failed to reenqueue_orphaned");
}
Expand Down
22 changes: 22 additions & 0 deletions packages/apalis-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Config {
keep_alive: Duration,
buffer_size: usize,
poll_interval: Duration,
reenqueue_orphaned_after: Duration,
namespace: String,
}

Expand All @@ -54,6 +55,7 @@ impl Default for Config {
keep_alive: Duration::from_secs(30),
buffer_size: 10,
poll_interval: Duration::from_millis(100),
reenqueue_orphaned_after: Duration::from_secs(300), // 5 minutes
namespace: String::from("apalis::sql"),
}
}
Expand Down Expand Up @@ -131,6 +133,26 @@ impl Config {
pub fn namespace_mut(&mut self) -> &mut String {
&mut self.namespace
}

/// Gets the reenqueue_orphaned_after duration.
pub fn reenqueue_orphaned_after(&self) -> Duration {
self.reenqueue_orphaned_after
}

/// Gets a mutable reference to the reenqueue_orphaned_after.
pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
&mut self.reenqueue_orphaned_after
}

/// Occasionally some workers die, or abandon jobs because of panics.
/// This is the time a task takes before its back to the queue
///
/// Defaults to 5 minutes
pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
self.reenqueue_orphaned_after = after;
self
}

}

/// Calculates the status from a result
Expand Down
47 changes: 31 additions & 16 deletions packages/apalis-sql/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ where
impl<Req, Res, C> Backend<Request<Req, SqlContext>, Res> for MysqlStorage<Req, C>
where
Req: Serialize + DeserializeOwned + Sync + Send + Unpin + 'static,
C: Debug + Codec<Compact = Value> + Clone + Send + 'static,
C: Debug + Codec<Compact = Value> + Clone + Send + 'static + Sync,
{
type Stream = BackendStream<RequestStream<Request<Req, SqlContext>>>;

Expand All @@ -387,6 +387,7 @@ where
let pool = self.pool.clone();
let ack_notify = self.ack_notify.clone();
let mut hb_storage = self.clone();
let requeue_storage = self.clone();
let stream = self
.stream_jobs(&worker, config.poll_interval, config.buffer_size)
.map_err(|e| Error::SourceError(Arc::new(Box::new(e))));
Expand Down Expand Up @@ -429,10 +430,23 @@ where
apalis_core::sleep(config.keep_alive).await;
}
};
let reenqueue_beat = async move {
loop {
let dead_since = Utc::now()
- chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap();
if let Err(e) = requeue_storage
.reenqueue_orphaned(config.buffer_size.try_into().unwrap(), dead_since)
.await
{
error!("ReenqueueOrphaned failed: {e}");
}
apalis_core::sleep(config.poll_interval).await;
}
};
Poller::new_with_layer(
stream,
async {
futures::join!(heartbeat, ack_heartbeat);
futures::join!(heartbeat, ack_heartbeat, reenqueue_beat);
},
layer,
)
Expand Down Expand Up @@ -493,27 +507,22 @@ impl<T, C: Codec> MysqlStorage<T, C> {
}

/// Readd jobs that are abandoned to the queue
pub async fn reenqueue_orphaned(&self, timeout: i64) -> Result<bool, sqlx::Error> {
pub async fn reenqueue_orphaned(
&self,
count: i32,
dead_since: DateTime<Utc>,
) -> Result<bool, sqlx::Error> {
let job_type = self.config.namespace.clone();
let mut tx = self.pool.acquire().await?;
let query = r#"Update jobs
INNER JOIN ( SELECT workers.id as worker_id, jobs.id as job_id from workers INNER JOIN jobs ON jobs.lock_by = workers.id WHERE jobs.status = "Running" AND workers.last_seen < ? AND workers.worker_type = ?
ORDER BY lock_at ASC LIMIT ?) as workers ON jobs.lock_by = workers.worker_id AND jobs.id = workers.job_id
SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ="Job was abandoned";"#;
let now = Utc::now().timestamp();
let seconds_ago = DateTime::from_timestamp(now - timeout, 0).ok_or(sqlx::Error::Io(
io::Error::new(io::ErrorKind::InvalidData, "Invalid timeout"),
))?;

sqlx::query(query)
.bind(seconds_ago)
.bind(dead_since)
.bind(job_type)
.bind::<i64>(
self.config
.buffer_size
.try_into()
.map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?,
)
.bind(count)
.execute(&mut *tx)
.await?;
Ok(true)
Expand Down Expand Up @@ -699,7 +708,10 @@ mod tests {

assert_eq!(*ctx.status(), State::Running);

storage.reenqueue_orphaned(300).await.unwrap();
storage
.reenqueue_orphaned(5, six_minutes_ago)
.await
.unwrap();

// then, the job status has changed to Pending
let job = storage
Expand Down Expand Up @@ -741,7 +753,10 @@ mod tests {
assert_eq!(*ctx.status(), State::Running);

// heartbeat with ReenqueueOrpharned pulse
storage.reenqueue_orphaned(300).await.unwrap();
storage
.reenqueue_orphaned(5, four_minutes_ago)
.await
.unwrap();

// then, the job status is not changed
let job = storage
Expand Down
23 changes: 19 additions & 4 deletions packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ where
let pool = self.pool.clone();
let heartbeat = async move {
let mut keep_alive_stm = apalis_core::interval::interval(config.keep_alive).fuse();
let mut reenqueue_orphaned_stm =
apalis_core::interval::interval(config.poll_interval).fuse();
let mut ack_stream = ack_notify.clone().ready_chunks(config.buffer_size).fuse();

let mut poll_next_stm = apalis_core::interval::interval(config.poll_interval).fuse();
Expand Down Expand Up @@ -221,6 +223,13 @@ where
error!("PgNotificationError: {e}");
}
}
_ = reenqueue_orphaned_stm.next() => {
let dead_since = Utc::now()
- chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap();
if let Err(e) = self.reenqueue_orphaned((config.buffer_size * 10) as i32, dead_since).await {
error!("ReenqueueOrphanedError: {}", e);
}
}


};
Expand Down Expand Up @@ -615,18 +624,24 @@ impl<T, C: Codec> PostgresStorage<T, C> {
}

/// Reenqueue jobs that have been abandoned by their workers
pub async fn reenqueue_orphaned(&mut self, count: i32) -> Result<(), sqlx::Error> {
pub async fn reenqueue_orphaned(
&mut self,
count: i32,
dead_since: DateTime<Utc>,
) -> Result<(), sqlx::Error> {
let job_type = self.config.namespace.clone();
let mut tx = self.pool.acquire().await?;
let query = "Update apalis.jobs
SET status = 'Pending', done_at = NULL, lock_by = NULL, lock_at = NULL, last_error ='Job was abandoned'
WHERE id in
(SELECT jobs.id from apalis.jobs INNER join apalis.workers ON lock_by = workers.id
WHERE status= 'Running' AND workers.last_seen < (NOW() - INTERVAL '300 seconds')
WHERE status= 'Running' AND workers.last_seen < (NOW() - $3)
AND workers.worker_type = $1 ORDER BY lock_at ASC LIMIT $2);";

sqlx::query(query)
.bind(job_type)
.bind(count)
.bind(dead_since)
.execute(&mut *tx)
.await?;
Ok(())
Expand Down Expand Up @@ -789,7 +804,7 @@ mod tests {

let job = consume_one(&mut storage, &worker_id).await;
storage
.reenqueue_orphaned(5)
.reenqueue_orphaned(5, six_minutes_ago)
.await
.expect("failed to heartbeat");
let job_id = &job.parts.task_id;
Expand Down Expand Up @@ -818,7 +833,7 @@ mod tests {

assert_eq!(*ctx.status(), State::Running);
storage
.reenqueue_orphaned(5)
.reenqueue_orphaned(5, four_minutes_ago)
.await
.expect("failed to heartbeat");

Expand Down
Loading

0 comments on commit e4d982d

Please sign in to comment.