Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reenque_orphaned for RedisStorage #468

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@
type Error = RedisError;
type Context = RedisContext;

async fn push_request(

Check warning on line 685 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Clippy

this function depends on never type fallback being `()`

Check warning on line 685 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 685 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 685 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
req: Request<T, RedisContext>,
) -> Result<Parts<Self::Context>, RedisError> {
Expand All @@ -705,7 +705,7 @@
Ok(req.parts)
}

async fn schedule_request(

Check warning on line 708 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 708 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 708 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
req: Request<Self::Job, RedisContext>,
on: i64,
Expand Down Expand Up @@ -768,7 +768,7 @@
Ok(())
}

async fn reschedule(

Check warning on line 771 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 771 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 771 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
job: Request<T, RedisContext>,
wait: Duration,
Expand Down Expand Up @@ -827,7 +827,7 @@
C: Codec<Compact = Vec<u8>> + Send + 'static,
{
/// Attempt to retry a job
pub async fn retry(&mut self, worker_id: &WorkerId, task_id: &TaskId) -> Result<i32, RedisError>

Check warning on line 830 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 830 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

this function depends on never type fallback being `()`

Check warning on line 830 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
where
T: Send + DeserializeOwned + Serialize + Unpin + Sync + 'static,
{
Expand Down Expand Up @@ -947,9 +947,7 @@
let active_jobs_list = self.config.active_jobs_list();
let signal_list = self.config.signal_list();

let now = Utc::now();
let duration = now.signed_duration_since(dead_since);
let dead_since = duration.num_seconds();
let dead_since = dead_since.timestamp();

let res: Result<usize, RedisError> = reenqueue_orphaned
.key(consumers_set)
Expand All @@ -968,7 +966,7 @@

#[cfg(test)]
mod tests {
use apalis_core::generic_storage_test;
use apalis_core::{generic_storage_test, sleep};
use email_service::Email;

use apalis_core::test_utils::apalis_test_service_fn;
Expand Down Expand Up @@ -1101,19 +1099,22 @@
}

#[tokio::test]
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() {
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_1sec() {
let mut storage = setup().await;

push_email(&mut storage, example_email()).await;

let worker_id = register_worker_at(&mut storage).await;

let job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
sleep(Duration::from_millis(1000)).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(1)).unwrap();
let res = storage
.reenqueue_orphaned(1, dead_since)
.await
.expect("failed to reenqueue_orphaned");
// We expect 1 job to be re-enqueued
assert_eq!(res, 1);
let job = get_job(&mut storage, &job.parts.task_id).await;
let ctx = &job.parts.context;
// assert_eq!(*ctx.status(), State::Pending);
Expand All @@ -1126,19 +1127,21 @@
}

#[tokio::test]
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() {
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_5sec() {
let mut storage = setup().await;

push_email(&mut storage, example_email()).await;

let worker_id = register_worker_at(&mut storage).await;

sleep(Duration::from_millis(1100)).await;
let job = consume_one(&mut storage, &worker_id).await;
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(300)).unwrap();
storage
let dead_since = Utc::now() - chrono::Duration::from_std(Duration::from_secs(5)).unwrap();
let res = storage
.reenqueue_orphaned(1, dead_since)
.await
.expect("failed to reenqueue_orphaned");
// We expect 0 job to be re-enqueued
assert_eq!(res, 0);
let job = get_job(&mut storage, &job.parts.task_id).await;
let _ctx = &job.parts.context;
// assert_eq!(*ctx.status(), State::Running);
Expand Down
Loading